PostgreSQL Source Code  git master
reorderbuffer.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/detoast.h"
#include "access/heapam.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "common/int.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenumbermap.h"
Include dependency graph for reorderbuffer.c:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTXNByIdEnt
 
struct  ReorderBufferTupleCidKey
 
struct  ReorderBufferTupleCidEnt
 
struct  TXNEntryFile
 
struct  ReorderBufferIterTXNEntry
 
struct  ReorderBufferIterTXNState
 
struct  ReorderBufferToastEnt
 
struct  ReorderBufferDiskChange
 
struct  RewriteMappingFile
 

Macros

#define IsSpecInsert(action)
 
#define IsSpecConfirmOrAbort(action)
 
#define IsInsertOrUpdate(action)
 
#define CHANGES_THRESHOLD   100
 

Typedefs

typedef struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
 
typedef struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
 
typedef struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
 
typedef struct TXNEntryFile TXNEntryFile
 
typedef struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
 
typedef struct ReorderBufferIterTXNState ReorderBufferIterTXNState
 
typedef struct ReorderBufferToastEnt ReorderBufferToastEnt
 
typedef struct ReorderBufferDiskChange ReorderBufferDiskChange
 
typedef struct RewriteMappingFile RewriteMappingFile
 

Functions

static ReorderBufferTXNReorderBufferGetTXN (ReorderBuffer *rb)
 
static void ReorderBufferReturnTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferTXNReorderBufferTXNByXid (ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
 
static void ReorderBufferTransferSnapToParent (ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
 
static void AssertTXNLsnOrder (ReorderBuffer *rb)
 
static void ReorderBufferIterTXNInit (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
 
static ReorderBufferChangeReorderBufferIterTXNNext (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferIterTXNFinish (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferExecuteInvalidations (uint32 nmsgs, SharedInvalidationMessage *msgs)
 
static void ReorderBufferCheckMemoryLimit (ReorderBuffer *rb)
 
static void ReorderBufferSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeChange (ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
 
static Size ReorderBufferRestoreChanges (ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
 
static void ReorderBufferRestoreChange (ReorderBuffer *rb, ReorderBufferTXN *txn, char *data)
 
static void ReorderBufferRestoreCleanup (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferTruncateTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
 
static void ReorderBufferCleanupSerializedTXNs (const char *slotname)
 
static void ReorderBufferSerializedPath (char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
 
static int ReorderBufferTXNSizeCompare (const pairingheap_node *a, const pairingheap_node *b, void *arg)
 
static void ReorderBufferFreeSnap (ReorderBuffer *rb, Snapshot snap)
 
static Snapshot ReorderBufferCopySnap (ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
 
static bool ReorderBufferCanStream (ReorderBuffer *rb)
 
static bool ReorderBufferCanStartStreaming (ReorderBuffer *rb)
 
static void ReorderBufferStreamTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferStreamCommit (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastInitHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReset (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReplace (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void ReorderBufferToastAppendChunk (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static Size ReorderBufferChangeSize (ReorderBufferChange *change)
 
static void ReorderBufferChangeMemoryUpdate (ReorderBuffer *rb, ReorderBufferChange *change, ReorderBufferTXN *txn, bool addition, Size sz)
 
ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *rb)
 
void ReorderBufferReturnChange (ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
 
HeapTuple ReorderBufferGetTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (HeapTuple tuple)
 
OidReorderBufferGetRelids (ReorderBuffer *rb, int nrelids)
 
void ReorderBufferReturnRelids (ReorderBuffer *rb, Oid *relids)
 
static void ReorderBufferProcessPartialChange (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
static void AssertChangeLsnOrder (ReorderBufferTXN *txn)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
TransactionId ReorderBufferGetOldestXmin (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
static int ReorderBufferIterCompare (Datum a, Datum b, void *arg)
 
static void ReorderBufferCleanupTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferBuildTupleCidHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void SetupCheckXidLive (TransactionId xid)
 
static void ReorderBufferApplyChange (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyTruncate (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyMessage (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferSaveTXNSnapshot (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
 
static void ReorderBufferResetTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
 
static void ReorderBufferProcessTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)
 
static void ReorderBufferReplay (ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
bool ReorderBufferRememberPrepareInfo (ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferSkipPrepare (ReorderBuffer *rb, TransactionId xid)
 
void ReorderBufferPrepare (ReorderBuffer *rb, TransactionId xid, char *gid)
 
void ReorderBufferFinishPrepared (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, TimestampTz abort_time)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferInvalidate (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileLocator locator, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
TransactionIdReorderBufferGetCatalogChangesXacts (ReorderBuffer *rb)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
static void ReorderBufferSerializeReserve (ReorderBuffer *rb, Size sz)
 
static ReorderBufferTXNReorderBufferLargestTXN (ReorderBuffer *rb)
 
static ReorderBufferTXNReorderBufferLargestStreamableTopTXN (ReorderBuffer *rb)
 
void StartupReorderBuffer (void)
 
static void ApplyLogicalMappingFile (HTAB *tuplecid_data, Oid relid, const char *fname)
 
static bool TransactionIdInArray (TransactionId xid, TransactionId *xip, Size num)
 
static int file_sort_by_lsn (const ListCell *a_p, const ListCell *b_p)
 
static void UpdateLogicalMappings (HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
 
bool ResolveCminCmaxDuringDecoding (HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
 

Variables

int logical_decoding_work_mem
 
static const Size max_changes_in_memory = 4096
 
int debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING_BUFFERED
 

Macro Definition Documentation

◆ CHANGES_THRESHOLD

#define CHANGES_THRESHOLD   100

◆ IsInsertOrUpdate

#define IsInsertOrUpdate (   action)
Value:

Definition at line 193 of file reorderbuffer.c.

◆ IsSpecConfirmOrAbort

#define IsSpecConfirmOrAbort (   action)
Value:
( \
)
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
Definition: reorderbuffer.h:61
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT
Definition: reorderbuffer.h:62

Definition at line 188 of file reorderbuffer.c.

◆ IsSpecInsert

#define IsSpecInsert (   action)
Value:

Definition at line 184 of file reorderbuffer.c.

Typedef Documentation

◆ ReorderBufferDiskChange

◆ ReorderBufferIterTXNEntry

◆ ReorderBufferIterTXNState

◆ ReorderBufferToastEnt

◆ ReorderBufferTupleCidEnt

◆ ReorderBufferTupleCidKey

◆ ReorderBufferTXNByIdEnt

◆ RewriteMappingFile

◆ TXNEntryFile

typedef struct TXNEntryFile TXNEntryFile

Function Documentation

◆ ApplyLogicalMappingFile()

static void ApplyLogicalMappingFile ( HTAB tuplecid_data,
Oid  relid,
const char *  fname 
)
static

Definition at line 5082 of file reorderbuffer.c.

5083 {
5084  char path[MAXPGPATH];
5085  int fd;
5086  int readBytes;
5088 
5089  sprintf(path, "%s/%s", PG_LOGICAL_MAPPINGS_DIR, fname);
5090  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
5091  if (fd < 0)
5092  ereport(ERROR,
5094  errmsg("could not open file \"%s\": %m", path)));
5095 
5096  while (true)
5097  {
5100  ReorderBufferTupleCidEnt *new_ent;
5101  bool found;
5102 
5103  /* be careful about padding */
5104  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
5105 
5106  /* read all mappings till the end of the file */
5107  pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ);
5108  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
5110 
5111  if (readBytes < 0)
5112  ereport(ERROR,
5114  errmsg("could not read file \"%s\": %m",
5115  path)));
5116  else if (readBytes == 0) /* EOF */
5117  break;
5118  else if (readBytes != sizeof(LogicalRewriteMappingData))
5119  ereport(ERROR,
5121  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
5122  path, readBytes,
5123  (int32) sizeof(LogicalRewriteMappingData))));
5124 
5125  key.rlocator = map.old_locator;
5126  ItemPointerCopy(&map.old_tid,
5127  &key.tid);
5128 
5129 
5130  ent = (ReorderBufferTupleCidEnt *)
5132 
5133  /* no existing mapping, no need to update */
5134  if (!ent)
5135  continue;
5136 
5137  key.rlocator = map.new_locator;
5138  ItemPointerCopy(&map.new_tid,
5139  &key.tid);
5140 
5141  new_ent = (ReorderBufferTupleCidEnt *)
5143 
5144  if (found)
5145  {
5146  /*
5147  * Make sure the existing mapping makes sense. We sometime update
5148  * old records that did not yet have a cmax (e.g. pg_class' own
5149  * entry while rewriting it) during rewrites, so allow that.
5150  */
5151  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
5152  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
5153  }
5154  else
5155  {
5156  /* update mapping */
5157  new_ent->cmin = ent->cmin;
5158  new_ent->cmax = ent->cmax;
5159  new_ent->combocid = ent->combocid;
5160  }
5161  }
5162 
5163  if (CloseTransientFile(fd) != 0)
5164  ereport(ERROR,
5166  errmsg("could not close file \"%s\": %m", path)));
5167 }
#define InvalidCommandId
Definition: c.h:660
signed int int32
Definition: c.h:496
#define Assert(condition)
Definition: c.h:849
#define PG_BINARY
Definition: c.h:1264
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
int errcode_for_file_access(void)
Definition: elog.c:876
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
int CloseTransientFile(int fd)
Definition: fd.c:2832
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2656
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_ENTER
Definition: hsearch.h:114
#define read(a, b, c)
Definition: win32.h:13
static void ItemPointerCopy(const ItemPointerData *fromPointer, ItemPointerData *toPointer)
Definition: itemptr.h:172
#define MAXPGPATH
#define sprintf
Definition: port.h:240
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_LOGICAL_MAPPINGS_DIR
Definition: reorderbuffer.h:23
static HTAB * tuplecid_data
Definition: snapmgr.c:102
ItemPointerData new_tid
Definition: rewriteheap.h:40
RelFileLocator old_locator
Definition: rewriteheap.h:37
ItemPointerData old_tid
Definition: rewriteheap.h:39
RelFileLocator new_locator
Definition: rewriteheap.h:38
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:85
static void pgstat_report_wait_end(void)
Definition: wait_event.h:101

References Assert, CloseTransientFile(), ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), HASH_ENTER, HASH_FIND, hash_search(), InvalidCommandId, ItemPointerCopy(), sort-test::key, MAXPGPATH, LogicalRewriteMappingData::new_locator, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_locator, LogicalRewriteMappingData::old_tid, OpenTransientFile(), PG_BINARY, PG_LOGICAL_MAPPINGS_DIR, pgstat_report_wait_end(), pgstat_report_wait_start(), read, sprintf, and tuplecid_data.

Referenced by UpdateLogicalMappings().

◆ AssertChangeLsnOrder()

static void AssertChangeLsnOrder ( ReorderBufferTXN txn)
static

Definition at line 991 of file reorderbuffer.c.

992 {
993 #ifdef USE_ASSERT_CHECKING
994  dlist_iter iter;
995  XLogRecPtr prev_lsn = txn->first_lsn;
996 
997  dlist_foreach(iter, &txn->changes)
998  {
999  ReorderBufferChange *cur_change;
1000 
1001  cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
1002 
1004  Assert(cur_change->lsn != InvalidXLogRecPtr);
1005  Assert(txn->first_lsn <= cur_change->lsn);
1006 
1007  if (txn->end_lsn != InvalidXLogRecPtr)
1008  Assert(cur_change->lsn <= txn->end_lsn);
1009 
1010  Assert(prev_lsn <= cur_change->lsn);
1011 
1012  prev_lsn = cur_change->lsn;
1013  }
1014 #endif
1015 }
#define dlist_foreach(iter, lhead)
Definition: ilist.h:623
#define dlist_container(type, membername, ptr)
Definition: ilist.h:593
XLogRecPtr first_lsn
XLogRecPtr end_lsn
dlist_head changes
dlist_node * cur
Definition: ilist.h:179
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert, ReorderBufferTXN::changes, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, and ReorderBufferChange::lsn.

Referenced by ReorderBufferIterTXNInit().

◆ AssertTXNLsnOrder()

static void AssertTXNLsnOrder ( ReorderBuffer rb)
static

Definition at line 920 of file reorderbuffer.c.

921 {
922 #ifdef USE_ASSERT_CHECKING
924  dlist_iter iter;
925  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
926  XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
927 
928  /*
929  * Skip the verification if we don't reach the LSN at which we start
930  * decoding the contents of transactions yet because until we reach the
931  * LSN, we could have transactions that don't have the association between
932  * the top-level transaction and subtransaction yet and consequently have
933  * the same LSN. We don't guarantee this association until we try to
934  * decode the actual contents of transaction. The ordering of the records
935  * prior to the start_decoding_at LSN should have been checked before the
936  * restart.
937  */
939  return;
940 
941  dlist_foreach(iter, &rb->toplevel_by_lsn)
942  {
944  iter.cur);
945 
946  /* start LSN must be set */
947  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
948 
949  /* If there is an end LSN, it must be higher than start LSN */
950  if (cur_txn->end_lsn != InvalidXLogRecPtr)
951  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
952 
953  /* Current initial LSN must be strictly higher than previous */
954  if (prev_first_lsn != InvalidXLogRecPtr)
955  Assert(prev_first_lsn < cur_txn->first_lsn);
956 
957  /* known-as-subtxn txns must not be listed */
958  Assert(!rbtxn_is_known_subxact(cur_txn));
959 
960  prev_first_lsn = cur_txn->first_lsn;
961  }
962 
964  {
966  base_snapshot_node,
967  iter.cur);
968 
969  /* base snapshot (and its LSN) must be set */
970  Assert(cur_txn->base_snapshot != NULL);
972 
973  /* current LSN must be strictly higher than previous */
974  if (prev_base_snap_lsn != InvalidXLogRecPtr)
975  Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
976 
977  /* known-as-subtxn txns must not be listed */
978  Assert(!rbtxn_is_known_subxact(cur_txn));
979 
980  prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
981  }
982 #endif
983 }
#define rbtxn_is_known_subxact(txn)
bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
Definition: snapbuild.c:304
XLogReaderState * reader
Definition: logical.h:42
struct SnapBuild * snapshot_builder
Definition: logical.h:44
XLogRecPtr base_snapshot_lsn
Snapshot base_snapshot
dlist_head txns_by_base_snapshot_lsn
dlist_head toplevel_by_lsn
void * private_data
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, XLogReaderState::EndRecPtr, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, ReorderBuffer::private_data, rbtxn_is_known_subxact, LogicalDecodingContext::reader, SnapBuildXactNeedsSkip(), LogicalDecodingContext::snapshot_builder, ReorderBuffer::toplevel_by_lsn, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferAssignChild(), ReorderBufferGetOldestTXN(), ReorderBufferGetOldestXmin(), ReorderBufferSetBaseSnapshot(), and ReorderBufferTXNByXid().

◆ file_sort_by_lsn()

static int file_sort_by_lsn ( const ListCell a_p,
const ListCell b_p 
)
static

Definition at line 5184 of file reorderbuffer.c.

5185 {
5188 
5189  return pg_cmp_u64(a->lsn, b->lsn);
5190 }
static int pg_cmp_u64(uint64 a, uint64 b)
Definition: int.h:616
int b
Definition: isn.c:70
int a
Definition: isn.c:69
#define lfirst(lc)
Definition: pg_list.h:172

References a, b, lfirst, and pg_cmp_u64().

Referenced by UpdateLogicalMappings().

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
TimestampTz  abort_time 
)

Definition at line 2931 of file reorderbuffer.c.

2933 {
2934  ReorderBufferTXN *txn;
2935 
2936  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2937  false);
2938 
2939  /* unknown, nothing to remove */
2940  if (txn == NULL)
2941  return;
2942 
2943  txn->xact_time.abort_time = abort_time;
2944 
2945  /* For streamed transactions notify the remote node about the abort. */
2946  if (rbtxn_is_streamed(txn))
2947  {
2948  rb->stream_abort(rb, txn, lsn);
2949 
2950  /*
2951  * We might have decoded changes for this transaction that could load
2952  * the cache as per the current transaction's view (consider DDL's
2953  * happened in this transaction). We don't want the decoding of future
2954  * transactions to use those cache entries so execute invalidations.
2955  */
2956  if (txn->ninvalidations > 0)
2958  txn->invalidations);
2959  }
2960 
2961  /* cosmetic... */
2962  txn->final_lsn = lsn;
2963 
2964  /* remove potential on-disk data, and deallocate */
2965  ReorderBufferCleanupTXN(rb, txn);
2966 }
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_streamed(txn)
SharedInvalidationMessage * invalidations
TimestampTz abort_time
XLogRecPtr final_lsn
union ReorderBufferTXN::@111 xact_time
ReorderBufferStreamAbortCB stream_abort

References ReorderBufferTXN::abort_time, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), ReorderBuffer::stream_abort, and ReorderBufferTXN::xact_time.

Referenced by DecodeAbort().

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)

Definition at line 2976 of file reorderbuffer.c.

2977 {
2978  dlist_mutable_iter it;
2979 
2980  /*
2981  * Iterate through all (potential) toplevel TXNs and abort all that are
2982  * older than what possibly can be running. Once we've found the first
2983  * that is alive we stop, there might be some that acquired an xid earlier
2984  * but started writing later, but it's unlikely and they will be cleaned
2985  * up in a later call to this function.
2986  */
2988  {
2989  ReorderBufferTXN *txn;
2990 
2991  txn = dlist_container(ReorderBufferTXN, node, it.cur);
2992 
2993  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2994  {
2995  elog(DEBUG2, "aborting old transaction %u", txn->xid);
2996 
2997  /* Notify the remote node about the crash/immediate restart. */
2998  if (rbtxn_is_streamed(txn))
2999  rb->stream_abort(rb, txn, InvalidXLogRecPtr);
3000 
3001  /* remove potential on-disk data, and deallocate this tx */
3002  ReorderBufferCleanupTXN(rb, txn);
3003  }
3004  else
3005  return;
3006  }
3007 }
#define DEBUG2
Definition: elog.h:29
#define elog(elevel,...)
Definition: elog.h:225
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:640
TransactionId xid
dlist_node * cur
Definition: ilist.h:200
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280

References dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, elog, InvalidXLogRecPtr, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBuffer::stream_abort, ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), and ReorderBufferTXN::xid.

Referenced by standby_decode().

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 3329 of file reorderbuffer.c.

3332 {
3333  ReorderBufferTXN *txn;
3334  MemoryContext oldcontext;
3335  ReorderBufferChange *change;
3336 
3337  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3338 
3339  oldcontext = MemoryContextSwitchTo(rb->context);
3340 
3341  /*
3342  * Collect all the invalidations under the top transaction, if available,
3343  * so that we can execute them all together. See comments atop this
3344  * function.
3345  */
3346  txn = rbtxn_get_toptxn(txn);
3347 
3348  Assert(nmsgs > 0);
3349 
3350  /* Accumulate invalidations. */
3351  if (txn->ninvalidations == 0)
3352  {
3353  txn->ninvalidations = nmsgs;
3355  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3356  memcpy(txn->invalidations, msgs,
3357  sizeof(SharedInvalidationMessage) * nmsgs);
3358  }
3359  else
3360  {
3363  (txn->ninvalidations + nmsgs));
3364 
3365  memcpy(txn->invalidations + txn->ninvalidations, msgs,
3366  nmsgs * sizeof(SharedInvalidationMessage));
3367  txn->ninvalidations += nmsgs;
3368  }
3369 
3370  change = ReorderBufferGetChange(rb);
3372  change->data.inval.ninvalidations = nmsgs;
3373  change->data.inval.invalidations = (SharedInvalidationMessage *)
3374  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3375  memcpy(change->data.inval.invalidations, msgs,
3376  sizeof(SharedInvalidationMessage) * nmsgs);
3377 
3378  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3379 
3380  MemoryContextSwitchTo(oldcontext);
3381 }
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1541
void * palloc(Size size)
Definition: mcxt.c:1317
MemoryContextSwitchTo(old_ctx)
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
#define rbtxn_get_toptxn(txn)
@ REORDER_BUFFER_CHANGE_INVALIDATION
Definition: reorderbuffer.h:56
ReorderBufferChangeType action
Definition: reorderbuffer.h:81
union ReorderBufferChange::@105 data
struct ReorderBufferChange::@105::@110 inval
MemoryContext context

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferTXN::invalidations, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, palloc(), rbtxn_get_toptxn, REORDER_BUFFER_CHANGE_INVALIDATION, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), and repalloc().

Referenced by xact_decode().

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileLocator  locator,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 3292 of file reorderbuffer.c.

3296 {
3298  ReorderBufferTXN *txn;
3299 
3300  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3301 
3302  change->data.tuplecid.locator = locator;
3303  change->data.tuplecid.tid = tid;
3304  change->data.tuplecid.cmin = cmin;
3305  change->data.tuplecid.cmax = cmax;
3306  change->data.tuplecid.combocid = combocid;
3307  change->lsn = lsn;
3308  change->txn = txn;
3310 
3311  dlist_push_tail(&txn->tuplecids, &change->node);
3312  txn->ntuplecids++;
3313 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:364
@ REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
Definition: reorderbuffer.h:59
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:84
struct ReorderBufferChange::@105::@109 tuplecid
dlist_head tuplecids

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, and ReorderBufferChange::txn.

Referenced by SnapBuildProcessNewCid().

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

◆ ReorderBufferAllocate()

ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 309 of file reorderbuffer.c.

310 {
311  ReorderBuffer *buffer;
312  HASHCTL hash_ctl;
313  MemoryContext new_ctx;
314 
315  Assert(MyReplicationSlot != NULL);
316 
317  /* allocate memory in own context, to have better accountability */
319  "ReorderBuffer",
321 
322  buffer =
323  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
324 
325  memset(&hash_ctl, 0, sizeof(hash_ctl));
326 
327  buffer->context = new_ctx;
328 
329  buffer->change_context = SlabContextCreate(new_ctx,
330  "Change",
332  sizeof(ReorderBufferChange));
333 
334  buffer->txn_context = SlabContextCreate(new_ctx,
335  "TXN",
337  sizeof(ReorderBufferTXN));
338 
339  /*
340  * To minimize memory fragmentation caused by long-running transactions
341  * with changes spanning multiple memory blocks, we use a single
342  * fixed-size memory block for decoded tuple storage. The performance
343  * testing showed that the default memory block size maintains logical
344  * decoding performance without causing fragmentation due to concurrent
345  * transactions. One might think that we can use the max size as
346  * SLAB_LARGE_BLOCK_SIZE but the test also showed it doesn't help resolve
347  * the memory fragmentation.
348  */
349  buffer->tup_context = GenerationContextCreate(new_ctx,
350  "Tuples",
354 
355  hash_ctl.keysize = sizeof(TransactionId);
356  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
357  hash_ctl.hcxt = buffer->context;
358 
359  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
361 
363  buffer->by_txn_last_txn = NULL;
364 
365  buffer->outbuf = NULL;
366  buffer->outbufsize = 0;
367  buffer->size = 0;
368 
369  /* txn_heap is ordered by transaction size */
371 
372  buffer->spillTxns = 0;
373  buffer->spillCount = 0;
374  buffer->spillBytes = 0;
375  buffer->streamTxns = 0;
376  buffer->streamCount = 0;
377  buffer->streamBytes = 0;
378  buffer->totalTxns = 0;
379  buffer->totalBytes = 0;
380 
382 
383  dlist_init(&buffer->toplevel_by_lsn);
385  dclist_init(&buffer->catchange_txns);
386 
387  /*
388  * Ensure there's no stale data from prior uses of this slot, in case some
389  * prior exit avoided calling ReorderBufferFree. Failure to do this can
390  * produce duplicated txns, and it's very cheap if there's nothing there.
391  */
393 
394  return buffer;
395 }
#define NameStr(name)
Definition: c.h:737
uint32 TransactionId
Definition: c.h:643
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: generation.c:160
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
static void dclist_init(dclist_head *head)
Definition: ilist.h:671
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1181
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:189
pairingheap * pairingheap_allocate(pairingheap_comparator compare, void *arg)
Definition: pairingheap.c:42
static int ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg)
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:322
ReplicationSlot * MyReplicationSlot
Definition: slot.c:138
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
MemoryContext hcxt
Definition: hsearch.h:86
dclist_head catchange_txns
MemoryContext change_context
ReorderBufferTXN * by_txn_last_txn
TransactionId by_txn_last_xid
MemoryContext tup_context
pairingheap * txn_heap
MemoryContext txn_context
XLogRecPtr current_restart_decoding_lsn
ReplicationSlotPersistentData data
Definition: slot.h:181
#define InvalidTransactionId
Definition: transam.h:31

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::catchange_txns, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dclist_init(), dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, pairingheap_allocate(), ReorderBufferCleanupSerializedTXNs(), ReorderBufferTXNSizeCompare(), ReorderBuffer::size, SLAB_DEFAULT_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBuffer::toplevel_by_lsn, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBuffer::tup_context, ReorderBuffer::txn_context, ReorderBuffer::txn_heap, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

◆ ReorderBufferApplyChange()

static void ReorderBufferApplyChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1994 of file reorderbuffer.c.

1997 {
1998  if (streaming)
1999  rb->stream_change(rb, txn, relation, change);
2000  else
2001  rb->apply_change(rb, txn, relation, change);
2002 }
ReorderBufferStreamChangeCB stream_change
ReorderBufferApplyChangeCB apply_change

References ReorderBuffer::apply_change, and ReorderBuffer::stream_change.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferApplyMessage()

static void ReorderBufferApplyMessage ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 2022 of file reorderbuffer.c.

2024 {
2025  if (streaming)
2026  rb->stream_message(rb, txn, change->lsn, true,
2027  change->data.msg.prefix,
2028  change->data.msg.message_size,
2029  change->data.msg.message);
2030  else
2031  rb->message(rb, txn, change->lsn, true,
2032  change->data.msg.prefix,
2033  change->data.msg.message_size,
2034  change->data.msg.message);
2035 }
struct ReorderBufferChange::@105::@108 msg
ReorderBufferStreamMessageCB stream_message
ReorderBufferMessageCB message

References ReorderBufferChange::data, ReorderBufferChange::lsn, ReorderBuffer::message, ReorderBufferChange::msg, and ReorderBuffer::stream_message.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferApplyTruncate()

static void ReorderBufferApplyTruncate ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  nrelations,
Relation relations,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 2008 of file reorderbuffer.c.

2011 {
2012  if (streaming)
2013  rb->stream_truncate(rb, txn, nrelations, relations, change);
2014  else
2015  rb->apply_truncate(rb, txn, nrelations, relations, change);
2016 }
ReorderBufferStreamTruncateCB stream_truncate
ReorderBufferApplyTruncateCB apply_truncate

References ReorderBuffer::apply_truncate, and ReorderBuffer::stream_truncate.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 1077 of file reorderbuffer.c.

1079 {
1080  ReorderBufferTXN *txn;
1081  ReorderBufferTXN *subtxn;
1082  bool new_top;
1083  bool new_sub;
1084 
1085  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1086  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1087 
1088  if (!new_sub)
1089  {
1090  if (rbtxn_is_known_subxact(subtxn))
1091  {
1092  /* already associated, nothing to do */
1093  return;
1094  }
1095  else
1096  {
1097  /*
1098  * We already saw this transaction, but initially added it to the
1099  * list of top-level txns. Now that we know it's not top-level,
1100  * remove it from there.
1101  */
1102  dlist_delete(&subtxn->node);
1103  }
1104  }
1105 
1106  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1107  subtxn->toplevel_xid = xid;
1108  Assert(subtxn->nsubtxns == 0);
1109 
1110  /* set the reference to top-level transaction */
1111  subtxn->toptxn = txn;
1112 
1113  /* add to subtransaction list */
1114  dlist_push_tail(&txn->subtxns, &subtxn->node);
1115  txn->nsubtxns++;
1116 
1117  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1118  ReorderBufferTransferSnapToParent(txn, subtxn);
1119 
1120  /* Verify LSN-ordering invariant */
1121  AssertTXNLsnOrder(rb);
1122 }
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
#define RBTXN_IS_SUBXACT
TransactionId toplevel_xid
struct ReorderBufferTXN * toptxn
dlist_head subtxns

References Assert, AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SUBXACT, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, ReorderBufferTXN::toplevel_xid, ReorderBufferTXN::toptxn, and ReorderBufferTXN::txn_flags.

Referenced by LogicalDecodingProcessRecord(), and ReorderBufferCommitChild().

◆ ReorderBufferBuildTupleCidHash()

static void ReorderBufferBuildTupleCidHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1760 of file reorderbuffer.c.

1761 {
1762  dlist_iter iter;
1763  HASHCTL hash_ctl;
1764 
1766  return;
1767 
1768  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1769  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1770  hash_ctl.hcxt = rb->context;
1771 
1772  /*
1773  * create the hash with the exact number of to-be-stored tuplecids from
1774  * the start
1775  */
1776  txn->tuplecid_hash =
1777  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1779 
1780  dlist_foreach(iter, &txn->tuplecids)
1781  {
1784  bool found;
1785  ReorderBufferChange *change;
1786 
1787  change = dlist_container(ReorderBufferChange, node, iter.cur);
1788 
1790 
1791  /* be careful about padding */
1792  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1793 
1794  key.rlocator = change->data.tuplecid.locator;
1795 
1796  ItemPointerCopy(&change->data.tuplecid.tid,
1797  &key.tid);
1798 
1799  ent = (ReorderBufferTupleCidEnt *)
1800  hash_search(txn->tuplecid_hash, &key, HASH_ENTER, &found);
1801  if (!found)
1802  {
1803  ent->cmin = change->data.tuplecid.cmin;
1804  ent->cmax = change->data.tuplecid.cmax;
1805  ent->combocid = change->data.tuplecid.combocid;
1806  }
1807  else
1808  {
1809  /*
1810  * Maybe we already saw this tuple before in this transaction, but
1811  * if so it must have the same cmin.
1812  */
1813  Assert(ent->cmin == change->data.tuplecid.cmin);
1814 
1815  /*
1816  * cmax may be initially invalid, but once set it can only grow,
1817  * and never become invalid again.
1818  */
1819  Assert((ent->cmax == InvalidCommandId) ||
1820  ((change->data.tuplecid.cmax != InvalidCommandId) &&
1821  (change->data.tuplecid.cmax > ent->cmax)));
1822  ent->cmax = change->data.tuplecid.cmax;
1823  }
1824  }
1825 }
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
#define rbtxn_has_catalog_changes(txn)

References ReorderBufferChange::action, Assert, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, dlist_container, dlist_foreach, dlist_is_empty(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), HASHCTL::hcxt, InvalidCommandId, ItemPointerCopy(), sort-test::key, HASHCTL::keysize, ReorderBufferTXN::ntuplecids, rbtxn_has_catalog_changes, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::tuplecids.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferCanStartStreaming()

static bool ReorderBufferCanStartStreaming ( ReorderBuffer rb)
inlinestatic

Definition at line 4029 of file reorderbuffer.c.

4030 {
4032  SnapBuild *builder = ctx->snapshot_builder;
4033 
4034  /* We can't start streaming unless a consistent state is reached. */
4036  return false;
4037 
4038  /*
4039  * We can't start streaming immediately even if the streaming is enabled
4040  * because we previously decoded this transaction and now just are
4041  * restarting.
4042  */
4043  if (ReorderBufferCanStream(rb) &&
4044  !SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr))
4045  return true;
4046 
4047  return false;
4048 }
static bool ReorderBufferCanStream(ReorderBuffer *rb)
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
Definition: snapbuild.c:277
@ SNAPBUILD_CONSISTENT
Definition: snapbuild.h:50
XLogRecPtr ReadRecPtr
Definition: xlogreader.h:206

References ReorderBuffer::private_data, LogicalDecodingContext::reader, XLogReaderState::ReadRecPtr, ReorderBufferCanStream(), SNAPBUILD_CONSISTENT, SnapBuildCurrentState(), SnapBuildXactNeedsSkip(), and LogicalDecodingContext::snapshot_builder.

Referenced by ReorderBufferCheckMemoryLimit(), and ReorderBufferProcessPartialChange().

◆ ReorderBufferCanStream()

static bool ReorderBufferCanStream ( ReorderBuffer rb)
inlinestatic

Definition at line 4020 of file reorderbuffer.c.

4021 {
4023 
4024  return ctx->streaming;
4025 }

References ReorderBuffer::private_data, and LogicalDecodingContext::streaming.

Referenced by ReorderBufferCanStartStreaming(), and ReorderBufferProcessPartialChange().

◆ ReorderBufferChangeMemoryUpdate()

static void ReorderBufferChangeMemoryUpdate ( ReorderBuffer rb,
ReorderBufferChange change,
ReorderBufferTXN txn,
bool  addition,
Size  sz 
)
static

Definition at line 3221 of file reorderbuffer.c.

3225 {
3226  ReorderBufferTXN *toptxn;
3227 
3228  Assert(txn || change);
3229 
3230  /*
3231  * Ignore tuple CID changes, because those are not evicted when reaching
3232  * memory limit. So we just don't count them, because it might easily
3233  * trigger a pointless attempt to spill.
3234  */
3235  if (change && change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
3236  return;
3237 
3238  if (sz == 0)
3239  return;
3240 
3241  if (txn == NULL)
3242  txn = change->txn;
3243  Assert(txn != NULL);
3244 
3245  /*
3246  * Update the total size in top level as well. This is later used to
3247  * compute the decoding stats.
3248  */
3249  toptxn = rbtxn_get_toptxn(txn);
3250 
3251  if (addition)
3252  {
3253  Size oldsize = txn->size;
3254 
3255  txn->size += sz;
3256  rb->size += sz;
3257 
3258  /* Update the total size in the top transaction. */
3259  toptxn->total_size += sz;
3260 
3261  /* Update the max-heap */
3262  if (oldsize != 0)
3263  pairingheap_remove(rb->txn_heap, &txn->txn_node);
3264  pairingheap_add(rb->txn_heap, &txn->txn_node);
3265  }
3266  else
3267  {
3268  Assert((rb->size >= sz) && (txn->size >= sz));
3269  txn->size -= sz;
3270  rb->size -= sz;
3271 
3272  /* Update the total size in the top transaction. */
3273  toptxn->total_size -= sz;
3274 
3275  /* Update the max-heap */
3276  pairingheap_remove(rb->txn_heap, &txn->txn_node);
3277  if (txn->size != 0)
3278  pairingheap_add(rb->txn_heap, &txn->txn_node);
3279  }
3280 
3281  Assert(txn->size <= rb->size);
3282 }
size_t Size
Definition: c.h:596
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:184
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:126
pairingheap_node txn_node

References ReorderBufferChange::action, Assert, pairingheap_add(), pairingheap_remove(), rbtxn_get_toptxn, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferTXN::size, ReorderBuffer::size, ReorderBufferTXN::total_size, ReorderBufferChange::txn, ReorderBuffer::txn_heap, and ReorderBufferTXN::txn_node.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferSerializeTXN(), ReorderBufferToastReplace(), and ReorderBufferTruncateTXN().

◆ ReorderBufferChangeSize()

static Size ReorderBufferChangeSize ( ReorderBufferChange change)
static

Definition at line 4172 of file reorderbuffer.c.

4173 {
4174  Size sz = sizeof(ReorderBufferChange);
4175 
4176  switch (change->action)
4177  {
4178  /* fall through these, they're all similar enough */
4183  {
4184  HeapTuple oldtup,
4185  newtup;
4186  Size oldlen = 0;
4187  Size newlen = 0;
4188 
4189  oldtup = change->data.tp.oldtuple;
4190  newtup = change->data.tp.newtuple;
4191 
4192  if (oldtup)
4193  {
4194  sz += sizeof(HeapTupleData);
4195  oldlen = oldtup->t_len;
4196  sz += oldlen;
4197  }
4198 
4199  if (newtup)
4200  {
4201  sz += sizeof(HeapTupleData);
4202  newlen = newtup->t_len;
4203  sz += newlen;
4204  }
4205 
4206  break;
4207  }
4209  {
4210  Size prefix_size = strlen(change->data.msg.prefix) + 1;
4211 
4212  sz += prefix_size + change->data.msg.message_size +
4213  sizeof(Size) + sizeof(Size);
4214 
4215  break;
4216  }
4218  {
4219  sz += sizeof(SharedInvalidationMessage) *
4220  change->data.inval.ninvalidations;
4221  break;
4222  }
4224  {
4225  Snapshot snap;
4226 
4227  snap = change->data.snapshot;
4228 
4229  sz += sizeof(SnapshotData) +
4230  sizeof(TransactionId) * snap->xcnt +
4231  sizeof(TransactionId) * snap->subxcnt;
4232 
4233  break;
4234  }
4236  {
4237  sz += sizeof(Oid) * change->data.truncate.nrelids;
4238 
4239  break;
4240  }
4245  /* ReorderBufferChange contains everything important */
4246  break;
4247  }
4248 
4249  return sz;
4250 }
struct HeapTupleData HeapTupleData
unsigned int Oid
Definition: postgres_ext.h:31
struct ReorderBufferChange ReorderBufferChange
@ REORDER_BUFFER_CHANGE_MESSAGE
Definition: reorderbuffer.h:55
@ REORDER_BUFFER_CHANGE_TRUNCATE
Definition: reorderbuffer.h:63
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:54
struct SnapshotData SnapshotData
uint32 t_len
Definition: htup.h:64
struct ReorderBufferChange::@105::@107 truncate
struct ReorderBufferChange::@105::@106 tp
int32 subxcnt
Definition: snapshot.h:181
uint32 xcnt
Definition: snapshot.h:169

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::msg, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::snapshot, SnapshotData::subxcnt, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, and SnapshotData::xcnt.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferToastReplace(), and ReorderBufferTruncateTXN().

◆ ReorderBufferCheckMemoryLimit()

static void ReorderBufferCheckMemoryLimit ( ReorderBuffer rb)
static

Definition at line 3637 of file reorderbuffer.c.

3638 {
3639  ReorderBufferTXN *txn;
3640 
3641  /*
3642  * Bail out if debug_logical_replication_streaming is buffered and we
3643  * haven't exceeded the memory limit.
3644  */
3646  rb->size < logical_decoding_work_mem * 1024L)
3647  return;
3648 
3649  /*
3650  * If debug_logical_replication_streaming is immediate, loop until there's
3651  * no change. Otherwise, loop until we reach under the memory limit. One
3652  * might think that just by evicting the largest (sub)transaction we will
3653  * come under the memory limit based on assumption that the selected
3654  * transaction is at least as large as the most recent change (which
3655  * caused us to go over the memory limit). However, that is not true
3656  * because a user can reduce the logical_decoding_work_mem to a smaller
3657  * value before the most recent change.
3658  */
3659  while (rb->size >= logical_decoding_work_mem * 1024L ||
3661  rb->size > 0))
3662  {
3663  /*
3664  * Pick the largest transaction and evict it from memory by streaming,
3665  * if possible. Otherwise, spill to disk.
3666  */
3668  (txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
3669  {
3670  /* we know there has to be one, because the size is not zero */
3671  Assert(txn && rbtxn_is_toptxn(txn));
3672  Assert(txn->total_size > 0);
3673  Assert(rb->size >= txn->total_size);
3674 
3675  ReorderBufferStreamTXN(rb, txn);
3676  }
3677  else
3678  {
3679  /*
3680  * Pick the largest transaction (or subtransaction) and evict it
3681  * from memory by serializing it to disk.
3682  */
3683  txn = ReorderBufferLargestTXN(rb);
3684 
3685  /* we know there has to be one, because the size is not zero */
3686  Assert(txn);
3687  Assert(txn->size > 0);
3688  Assert(rb->size >= txn->size);
3689 
3690  ReorderBufferSerializeTXN(rb, txn);
3691  }
3692 
3693  /*
3694  * After eviction, the transaction should have no entries in memory,
3695  * and should use 0 bytes for changes.
3696  */
3697  Assert(txn->size == 0);
3698  Assert(txn->nentries_mem == 0);
3699  }
3700 
3701  /* We must be under the memory limit now. */
3702  Assert(rb->size < logical_decoding_work_mem * 1024L);
3703 
3704 }
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)
static ReorderBufferTXN * ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
int logical_decoding_work_mem
int debug_logical_replication_streaming
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
Definition: reorderbuffer.h:34
@ DEBUG_LOGICAL_REP_STREAMING_BUFFERED
Definition: reorderbuffer.h:33
#define rbtxn_is_toptxn(txn)

References Assert, DEBUG_LOGICAL_REP_STREAMING_BUFFERED, DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE, debug_logical_replication_streaming, logical_decoding_work_mem, ReorderBufferTXN::nentries_mem, rbtxn_is_toptxn, ReorderBufferCanStartStreaming(), ReorderBufferLargestStreamableTopTXN(), ReorderBufferLargestTXN(), ReorderBufferSerializeTXN(), ReorderBufferStreamTXN(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBufferTXN::total_size.

Referenced by ReorderBufferQueueChange().

◆ ReorderBufferCleanupSerializedTXNs()

static void ReorderBufferCleanupSerializedTXNs ( const char *  slotname)
static

Definition at line 4598 of file reorderbuffer.c.

4599 {
4600  DIR *spill_dir;
4601  struct dirent *spill_de;
4602  struct stat statbuf;
4603  char path[MAXPGPATH * 2 + sizeof(PG_REPLSLOT_DIR)];
4604 
4605  sprintf(path, "%s/%s", PG_REPLSLOT_DIR, slotname);
4606 
4607  /* we're only handling directories here, skip if it's not ours */
4608  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
4609  return;
4610 
4611  spill_dir = AllocateDir(path);
4612  while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
4613  {
4614  /* only look at names that can be ours */
4615  if (strncmp(spill_de->d_name, "xid", 3) == 0)
4616  {
4617  snprintf(path, sizeof(path),
4618  "%s/%s/%s", PG_REPLSLOT_DIR, slotname,
4619  spill_de->d_name);
4620 
4621  if (unlink(path) != 0)
4622  ereport(ERROR,
4624  errmsg("could not remove file \"%s\" during removal of %s/%s/xid*: %m",
4625  path, PG_REPLSLOT_DIR, slotname)));
4626  }
4627  }
4628  FreeDir(spill_dir);
4629 }
#define INFO
Definition: elog.h:34
int FreeDir(DIR *dir)
Definition: fd.c:2984
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2947
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2866
#define snprintf
Definition: port.h:238
#define PG_REPLSLOT_DIR
Definition: slot.h:21
Definition: dirent.c:26
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15
#define lstat(path, sb)
Definition: win32_port.h:285
#define S_ISDIR(m)
Definition: win32_port.h:325

References AllocateDir(), dirent::d_name, ereport, errcode_for_file_access(), errmsg(), ERROR, FreeDir(), INFO, lstat, MAXPGPATH, PG_REPLSLOT_DIR, ReadDirExtended(), S_ISDIR, snprintf, sprintf, and stat::st_mode.

Referenced by ReorderBufferAllocate(), ReorderBufferFree(), and StartupReorderBuffer().

◆ ReorderBufferCleanupTXN()

static void ReorderBufferCleanupTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1513 of file reorderbuffer.c.

1514 {
1515  bool found;
1516  dlist_mutable_iter iter;
1517  Size mem_freed = 0;
1518 
1519  /* cleanup subtransactions & their changes */
1520  dlist_foreach_modify(iter, &txn->subtxns)
1521  {
1522  ReorderBufferTXN *subtxn;
1523 
1524  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1525 
1526  /*
1527  * Subtransactions are always associated to the toplevel TXN, even if
1528  * they originally were happening inside another subtxn, so we won't
1529  * ever recurse more than one level deep here.
1530  */
1531  Assert(rbtxn_is_known_subxact(subtxn));
1532  Assert(subtxn->nsubtxns == 0);
1533 
1534  ReorderBufferCleanupTXN(rb, subtxn);
1535  }
1536 
1537  /* cleanup changes in the txn */
1538  dlist_foreach_modify(iter, &txn->changes)
1539  {
1540  ReorderBufferChange *change;
1541 
1542  change = dlist_container(ReorderBufferChange, node, iter.cur);
1543 
1544  /* Check we're not mixing changes from different transactions. */
1545  Assert(change->txn == txn);
1546 
1547  /*
1548  * Instead of updating the memory counter for individual changes, we
1549  * sum up the size of memory to free so we can update the memory
1550  * counter all together below. This saves costs of maintaining the
1551  * max-heap.
1552  */
1553  mem_freed += ReorderBufferChangeSize(change);
1554 
1555  ReorderBufferReturnChange(rb, change, false);
1556  }
1557 
1558  /* Update the memory counter */
1559  ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
1560 
1561  /*
1562  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1563  * They are always stored in the toplevel transaction.
1564  */
1565  dlist_foreach_modify(iter, &txn->tuplecids)
1566  {
1567  ReorderBufferChange *change;
1568 
1569  change = dlist_container(ReorderBufferChange, node, iter.cur);
1570 
1571  /* Check we're not mixing changes from different transactions. */
1572  Assert(change->txn == txn);
1574 
1575  ReorderBufferReturnChange(rb, change, true);
1576  }
1577 
1578  /*
1579  * Cleanup the base snapshot, if set.
1580  */
1581  if (txn->base_snapshot != NULL)
1582  {
1585  }
1586 
1587  /*
1588  * Cleanup the snapshot for the last streamed run.
1589  */
1590  if (txn->snapshot_now != NULL)
1591  {
1592  Assert(rbtxn_is_streamed(txn));
1594  }
1595 
1596  /*
1597  * Remove TXN from its containing lists.
1598  *
1599  * Note: if txn is known as subxact, we are deleting the TXN from its
1600  * parent's list of known subxacts; this leaves the parent's nsubxacts
1601  * count too high, but we don't care. Otherwise, we are deleting the TXN
1602  * from the LSN-ordered list of toplevel TXNs. We remove the TXN from the
1603  * list of catalog modifying transactions as well.
1604  */
1605  dlist_delete(&txn->node);
1606  if (rbtxn_has_catalog_changes(txn))
1608 
1609  /* now remove reference from buffer */
1610  hash_search(rb->by_txn, &txn->xid, HASH_REMOVE, &found);
1611  Assert(found);
1612 
1613  /* remove entries spilled to disk */
1614  if (rbtxn_is_serialized(txn))
1615  ReorderBufferRestoreCleanup(rb, txn);
1616 
1617  /* deallocate */
1618  ReorderBufferReturnTXN(rb, txn);
1619 }
@ HASH_REMOVE
Definition: hsearch.h:115
static void dclist_delete_from(dclist_head *head, dlist_node *node)
Definition: ilist.h:763
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, ReorderBufferTXN *txn, bool addition, Size sz)
#define rbtxn_is_serialized(txn)
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:328
Snapshot snapshot_now
dlist_node catchange_node
dlist_node base_snapshot_node

References ReorderBufferChange::action, Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_node, ReorderBuffer::by_txn, ReorderBufferTXN::catchange_node, ReorderBuffer::catchange_txns, ReorderBufferTXN::changes, dlist_mutable_iter::cur, dclist_delete_from(), dlist_container, dlist_delete(), dlist_foreach_modify, HASH_REMOVE, hash_search(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_has_catalog_changes, rbtxn_is_known_subxact, rbtxn_is_serialized, rbtxn_is_streamed, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferFreeSnap(), ReorderBufferRestoreCleanup(), ReorderBufferReturnChange(), ReorderBufferReturnTXN(), SnapBuildSnapDecRefcount(), ReorderBufferTXN::snapshot_now, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAbortOld(), ReorderBufferFinishPrepared(), ReorderBufferForget(), ReorderBufferProcessTXN(), ReorderBufferReplay(), and ReorderBufferStreamCommit().

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2740 of file reorderbuffer.c.

2744 {
2745  ReorderBufferTXN *txn;
2746 
2747  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2748  false);
2749 
2750  /* unknown transaction, nothing to replay */
2751  if (txn == NULL)
2752  return;
2753 
2754  ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2755  origin_id, origin_lsn);
2756 }
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)

References InvalidXLogRecPtr, ReorderBufferReplay(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 1197 of file reorderbuffer.c.

1200 {
1201  ReorderBufferTXN *subtxn;
1202 
1203  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1204  InvalidXLogRecPtr, false);
1205 
1206  /*
1207  * No need to do anything if that subtxn didn't contain any changes
1208  */
1209  if (!subtxn)
1210  return;
1211 
1212  subtxn->final_lsn = commit_lsn;
1213  subtxn->end_lsn = end_lsn;
1214 
1215  /*
1216  * Assign this subxact as a child of the toplevel xact (no-op if already
1217  * done.)
1218  */
1219  ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1220 }
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit(), and DecodePrepare().

◆ ReorderBufferCopySnap()

static Snapshot ReorderBufferCopySnap ( ReorderBuffer rb,
Snapshot  orig_snap,
ReorderBufferTXN txn,
CommandId  cid 
)
static

Definition at line 1833 of file reorderbuffer.c.

1835 {
1836  Snapshot snap;
1837  dlist_iter iter;
1838  int i = 0;
1839  Size size;
1840 
1841  size = sizeof(SnapshotData) +
1842  sizeof(TransactionId) * orig_snap->xcnt +
1843  sizeof(TransactionId) * (txn->nsubtxns + 1);
1844 
1845  snap = MemoryContextAllocZero(rb->context, size);
1846  memcpy(snap, orig_snap, sizeof(SnapshotData));
1847 
1848  snap->copied = true;
1849  snap->active_count = 1; /* mark as active so nobody frees it */
1850  snap->regd_count = 0;
1851  snap->xip = (TransactionId *) (snap + 1);
1852 
1853  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1854 
1855  /*
1856  * snap->subxip contains all txids that belong to our transaction which we
1857  * need to check via cmin/cmax. That's why we store the toplevel
1858  * transaction in there as well.
1859  */
1860  snap->subxip = snap->xip + snap->xcnt;
1861  snap->subxip[i++] = txn->xid;
1862 
1863  /*
1864  * subxcnt isn't decreased when subtransactions abort, so count manually.
1865  * Since it's an upper boundary it is safe to use it for the allocation
1866  * above.
1867  */
1868  snap->subxcnt = 1;
1869 
1870  dlist_foreach(iter, &txn->subtxns)
1871  {
1872  ReorderBufferTXN *sub_txn;
1873 
1874  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1875  snap->subxip[i++] = sub_txn->xid;
1876  snap->subxcnt++;
1877  }
1878 
1879  /* sort so we can bsearch() later */
1880  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1881 
1882  /* store the specified current CommandId */
1883  snap->curcid = cid;
1884 
1885  return snap;
1886 }
int i
Definition: isn.c:73
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1215
#define qsort(a, b, c, d)
Definition: port.h:447
static pg_noinline void Size size
Definition: slab.c:607
bool copied
Definition: snapshot.h:185
uint32 regd_count
Definition: snapshot.h:205
uint32 active_count
Definition: snapshot.h:204
CommandId curcid
Definition: snapshot.h:187
TransactionId * subxip
Definition: snapshot.h:180
TransactionId * xip
Definition: snapshot.h:168
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:152

References SnapshotData::active_count, ReorderBuffer::context, SnapshotData::copied, dlist_iter::cur, SnapshotData::curcid, dlist_container, dlist_foreach, i, MemoryContextAllocZero(), ReorderBufferTXN::nsubtxns, qsort, SnapshotData::regd_count, size, ReorderBufferTXN::subtxns, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::xcnt, ReorderBufferTXN::xid, xidComparator(), and SnapshotData::xip.

Referenced by ReorderBufferProcessTXN(), ReorderBufferSaveTXNSnapshot(), and ReorderBufferStreamTXN().

◆ ReorderBufferExecuteInvalidations()

static void ReorderBufferExecuteInvalidations ( uint32  nmsgs,
SharedInvalidationMessage msgs 
)
static

Definition at line 3388 of file reorderbuffer.c.

3389 {
3390  int i;
3391 
3392  for (i = 0; i < nmsgs; i++)
3394 }
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:705

References i, and LocalExecuteInvalidationMessage().

Referenced by ReorderBufferFinishPrepared(), and ReorderBufferProcessTXN().

◆ ReorderBufferFinishPrepared()

void ReorderBufferFinishPrepared ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
XLogRecPtr  two_phase_at,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn,
char *  gid,
bool  is_commit 
)

Definition at line 2846 of file reorderbuffer.c.

2851 {
2852  ReorderBufferTXN *txn;
2853  XLogRecPtr prepare_end_lsn;
2854  TimestampTz prepare_time;
2855 
2856  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
2857 
2858  /* unknown transaction, nothing to do */
2859  if (txn == NULL)
2860  return;
2861 
2862  /*
2863  * By this time the txn has the prepare record information, remember it to
2864  * be later used for rollback.
2865  */
2866  prepare_end_lsn = txn->end_lsn;
2867  prepare_time = txn->xact_time.prepare_time;
2868 
2869  /* add the gid in the txn */
2870  txn->gid = pstrdup(gid);
2871 
2872  /*
2873  * It is possible that this transaction is not decoded at prepare time
2874  * either because by that time we didn't have a consistent snapshot, or
2875  * two_phase was not enabled, or it was decoded earlier but we have
2876  * restarted. We only need to send the prepare if it was not decoded
2877  * earlier. We don't need to decode the xact for aborts if it is not done
2878  * already.
2879  */
2880  if ((txn->final_lsn < two_phase_at) && is_commit)
2881  {
2882  txn->txn_flags |= RBTXN_PREPARE;
2883 
2884  /*
2885  * The prepare info must have been updated in txn even if we skip
2886  * prepare.
2887  */
2889 
2890  /*
2891  * By this time the txn has the prepare record information and it is
2892  * important to use that so that downstream gets the accurate
2893  * information. If instead, we have passed commit information here
2894  * then downstream can behave as it has already replayed commit
2895  * prepared after the restart.
2896  */
2897  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2898  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2899  }
2900 
2901  txn->final_lsn = commit_lsn;
2902  txn->end_lsn = end_lsn;
2903  txn->xact_time.commit_time = commit_time;
2904  txn->origin_id = origin_id;
2905  txn->origin_lsn = origin_lsn;
2906 
2907  if (is_commit)
2908  rb->commit_prepared(rb, txn, commit_lsn);
2909  else
2910  rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
2911 
2912  /* cleanup: make sure there's no cache pollution */
2914  txn->invalidations);
2915  ReorderBufferCleanupTXN(rb, txn);
2916 }
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1696
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
#define RBTXN_PREPARE
TimestampTz commit_time
RepOriginId origin_id
XLogRecPtr origin_lsn
TimestampTz prepare_time
ReorderBufferCommitPreparedCB commit_prepared
ReorderBufferRollbackPreparedCB rollback_prepared

References Assert, ReorderBuffer::commit_prepared, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, pstrdup(), RBTXN_PREPARE, ReorderBufferCleanupTXN(), ReorderBufferExecuteInvalidations(), ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBuffer::rollback_prepared, ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodeAbort(), and DecodeCommit().

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3023 of file reorderbuffer.c.

3024 {
3025  ReorderBufferTXN *txn;
3026 
3027  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3028  false);
3029 
3030  /* unknown, nothing to forget */
3031  if (txn == NULL)
3032  return;
3033 
3034  /* this transaction mustn't be streamed */
3035  Assert(!rbtxn_is_streamed(txn));
3036 
3037  /* cosmetic... */
3038  txn->final_lsn = lsn;
3039 
3040  /*
3041  * Process cache invalidation messages if there are any. Even if we're not
3042  * interested in the transaction's contents, it could have manipulated the
3043  * catalog and we need to update the caches according to that.
3044  */
3045  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3047  txn->invalidations);
3048  else
3049  Assert(txn->ninvalidations == 0);
3050 
3051  /* remove potential on-disk data, and deallocate */
3052  ReorderBufferCleanupTXN(rb, txn);
3053 }

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer rb)

Definition at line 401 of file reorderbuffer.c.

402 {
404 
405  /*
406  * We free separately allocated data by entirely scrapping reorderbuffer's
407  * memory context.
408  */
410 
411  /* Free disk space used by unconsumed reorder buffers */
413 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
tree context
Definition: radixtree.h:1835

References context, ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

◆ ReorderBufferFreeSnap()

static void ReorderBufferFreeSnap ( ReorderBuffer rb,
Snapshot  snap 
)
static

Definition at line 1892 of file reorderbuffer.c.

1893 {
1894  if (snap->copied)
1895  pfree(snap);
1896  else
1898 }
void pfree(void *pointer)
Definition: mcxt.c:1521

References SnapshotData::copied, pfree(), and SnapBuildSnapDecRefcount().

Referenced by ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), ReorderBufferReturnChange(), and ReorderBufferStreamTXN().

◆ ReorderBufferGetCatalogChangesXacts()

TransactionId* ReorderBufferGetCatalogChangesXacts ( ReorderBuffer rb)

Definition at line 3438 of file reorderbuffer.c.

3439 {
3440  dlist_iter iter;
3441  TransactionId *xids = NULL;
3442  size_t xcnt = 0;
3443 
3444  /* Quick return if the list is empty */
3445  if (dclist_count(&rb->catchange_txns) == 0)
3446  return NULL;
3447 
3448  /* Initialize XID array */
3449  xids = (TransactionId *) palloc(sizeof(TransactionId) *
3451  dclist_foreach(iter, &rb->catchange_txns)
3452  {
3454  catchange_node,
3455  iter.cur);
3456 
3458 
3459  xids[xcnt++] = txn->xid;
3460  }
3461 
3462  qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
3463 
3464  Assert(xcnt == dclist_count(&rb->catchange_txns));
3465  return xids;
3466 }
#define dclist_container(type, membername, ptr)
Definition: ilist.h:947
static uint32 dclist_count(const dclist_head *head)
Definition: ilist.h:932
#define dclist_foreach(iter, lhead)
Definition: ilist.h:970

References Assert, ReorderBuffer::catchange_txns, dlist_iter::cur, dclist_container, dclist_count(), dclist_foreach, palloc(), qsort, rbtxn_has_catalog_changes, ReorderBufferTXN::xid, and xidComparator().

Referenced by SnapBuildSerialize().

◆ ReorderBufferGetChange()

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer rb)

Definition at line 1022 of file reorderbuffer.c.

1023 {
1024  ReorderBufferTXN *txn;
1025 
1026  AssertTXNLsnOrder(rb);
1027 
1028  if (dlist_is_empty(&rb->toplevel_by_lsn))
1029  return NULL;
1030 
1032 
1035  return txn;
1036 }
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:603

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, and ReorderBuffer::toplevel_by_lsn.

Referenced by SnapBuildProcessRunningXacts().

◆ ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBuffer rb)

Definition at line 1050 of file reorderbuffer.c.

1051 {
1052  ReorderBufferTXN *txn;
1053 
1054  AssertTXNLsnOrder(rb);
1055 
1057  return InvalidTransactionId;
1058 
1059  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
1061  return txn->base_snapshot->xmin;
1062 }
TransactionId xmin
Definition: snapshot.h:157

References AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, dlist_head_element, dlist_is_empty(), InvalidTransactionId, ReorderBuffer::txns_by_base_snapshot_lsn, and SnapshotData::xmin.

Referenced by SnapBuildProcessRunningXacts().

◆ ReorderBufferGetRelids()

Oid* ReorderBufferGetRelids ( ReorderBuffer rb,
int  nrelids 
)

Definition at line 603 of file reorderbuffer.c.

604 {
605  Oid *relids;
606  Size alloc_len;
607 
608  alloc_len = sizeof(Oid) * nrelids;
609 
610  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
611 
612  return relids;
613 }

References ReorderBuffer::context, and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

◆ ReorderBufferGetTupleBuf()

HeapTuple ReorderBufferGetTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)

Definition at line 570 of file reorderbuffer.c.

571 {
572  HeapTuple tuple;
573  Size alloc_len;
574 
575  alloc_len = tuple_len + SizeofHeapTupleHeader;
576 
578  HEAPTUPLESIZE + alloc_len);
579  tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
580 
581  return tuple;
582 }
#define HEAPTUPLESIZE
Definition: htup.h:73
HeapTupleData * HeapTuple
Definition: htup.h:71
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
#define SizeofHeapTupleHeader
Definition: htup_details.h:185
HeapTupleHeader t_data
Definition: htup.h:68

References HEAPTUPLESIZE, MemoryContextAlloc(), SizeofHeapTupleHeader, HeapTupleData::t_data, and ReorderBuffer::tup_context.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

◆ ReorderBufferGetTXN()

static ReorderBufferTXN * ReorderBufferGetTXN ( ReorderBuffer rb)
static

Definition at line 419 of file reorderbuffer.c.

420 {
421  ReorderBufferTXN *txn;
422 
423  txn = (ReorderBufferTXN *)
425 
426  memset(txn, 0, sizeof(ReorderBufferTXN));
427 
428  dlist_init(&txn->changes);
429  dlist_init(&txn->tuplecids);
430  dlist_init(&txn->subtxns);
431 
432  /* InvalidCommandId is not zero, so set it explicitly */
434  txn->output_plugin_private = NULL;
435 
436  return txn;
437 }
CommandId command_id
void * output_plugin_private

References ReorderBufferTXN::changes, ReorderBufferTXN::command_id, dlist_init(), InvalidCommandId, MemoryContextAlloc(), ReorderBufferTXN::output_plugin_private, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, and ReorderBuffer::txn_context.

Referenced by ReorderBufferTXNByXid().

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 3095 of file reorderbuffer.c.

3097 {
3098  bool use_subtxn = IsTransactionOrTransactionBlock();
3099  int i;
3100 
3101  if (use_subtxn)
3102  BeginInternalSubTransaction("replay");
3103 
3104  /*
3105  * Force invalidations to happen outside of a valid transaction - that way
3106  * entries will just be marked as invalid without accessing the catalog.
3107  * That's advantageous because we don't need to setup the full state
3108  * necessary for catalog access.
3109  */
3110  if (use_subtxn)
3112 
3113  for (i = 0; i < ninvalidations; i++)
3114  LocalExecuteInvalidationMessage(&invalidations[i]);
3115 
3116  if (use_subtxn)
3118 }
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4982
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4687
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4789
void AbortCurrentTransaction(void)
Definition: xact.c:3431

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by ReorderBufferAbort(), ReorderBufferForget(), ReorderBufferInvalidate(), and xact_decode().

◆ ReorderBufferInvalidate()

void ReorderBufferInvalidate ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3064 of file reorderbuffer.c.

3065 {
3066  ReorderBufferTXN *txn;
3067 
3068  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3069  false);
3070 
3071  /* unknown, nothing to do */
3072  if (txn == NULL)
3073  return;
3074 
3075  /*
3076  * Process cache invalidation messages if there are any. Even if we're not
3077  * interested in the transaction's contents, it could have manipulated the
3078  * catalog and we need to update the caches according to that.
3079  */
3080  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3082  txn->invalidations);
3083  else
3084  Assert(txn->ninvalidations == 0);
3085 }

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodePrepare().

◆ ReorderBufferIterCompare()

static int ReorderBufferIterCompare ( Datum  a,
Datum  b,
void *  arg 
)
static

Definition at line 1239 of file reorderbuffer.c.

1240 {
1242  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
1243  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
1244 
1245  if (pos_a < pos_b)
1246  return 1;
1247  else if (pos_a == pos_b)
1248  return 0;
1249  return -1;
1250 }
void * arg
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:202
Definition: regguts.h:323

References a, arg, b, and DatumGetInt32().

Referenced by ReorderBufferIterTXNInit().

◆ ReorderBufferIterTXNFinish()

static void ReorderBufferIterTXNFinish ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1482 of file reorderbuffer.c.

1484 {
1485  int32 off;
1486 
1487  for (off = 0; off < state->nr_txns; off++)
1488  {
1489  if (state->entries[off].file.vfd != -1)
1490  FileClose(state->entries[off].file.vfd);
1491  }
1492 
1493  /* free memory we might have "leaked" in the last *Next call */
1494  if (!dlist_is_empty(&state->old_change))
1495  {
1496  ReorderBufferChange *change;
1497 
1498  change = dlist_container(ReorderBufferChange, node,
1499  dlist_pop_head_node(&state->old_change));
1500  ReorderBufferReturnChange(rb, change, true);
1501  Assert(dlist_is_empty(&state->old_change));
1502  }
1503 
1504  binaryheap_free(state->heap);
1505  pfree(state);
1506 }
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:75
void FileClose(File file)
Definition: fd.c:1978
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:450

References Assert, binaryheap_free(), dlist_container, dlist_is_empty(), dlist_pop_head_node(), FileClose(), pfree(), and ReorderBufferReturnChange().

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferIterTXNInit()

static void ReorderBufferIterTXNInit ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferIterTXNState *volatile *  iter_state 
)
static

Definition at line 1262 of file reorderbuffer.c.

1264 {
1265  Size nr_txns = 0;
1267  dlist_iter cur_txn_i;
1268  int32 off;
1269 
1270  *iter_state = NULL;
1271 
1272  /* Check ordering of changes in the toplevel transaction. */
1273  AssertChangeLsnOrder(txn);
1274 
1275  /*
1276  * Calculate the size of our heap: one element for every transaction that
1277  * contains changes. (Besides the transactions already in the reorder
1278  * buffer, we count the one we were directly passed.)
1279  */
1280  if (txn->nentries > 0)
1281  nr_txns++;
1282 
1283  dlist_foreach(cur_txn_i, &txn->subtxns)
1284  {
1285  ReorderBufferTXN *cur_txn;
1286 
1287  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1288 
1289  /* Check ordering of changes in this subtransaction. */
1290  AssertChangeLsnOrder(cur_txn);
1291 
1292  if (cur_txn->nentries > 0)
1293  nr_txns++;
1294  }
1295 
1296  /* allocate iteration state */
1299  sizeof(ReorderBufferIterTXNState) +
1300  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1301 
1302  state->nr_txns = nr_txns;
1303  dlist_init(&state->old_change);
1304 
1305  for (off = 0; off < state->nr_txns; off++)
1306  {
1307  state->entries[off].file.vfd = -1;
1308  state->entries[off].segno = 0;
1309  }
1310 
1311  /* allocate heap */
1312  state->heap = binaryheap_allocate(state->nr_txns,
1314  state);
1315 
1316  /* Now that the state fields are initialized, it is safe to return it. */
1317  *iter_state = state;
1318 
1319  /*
1320  * Now insert items into the binary heap, in an unordered fashion. (We
1321  * will run a heap assembly step at the end; this is more efficient.)
1322  */
1323 
1324  off = 0;
1325 
1326  /* add toplevel transaction if it contains changes */
1327  if (txn->nentries > 0)
1328  {
1329  ReorderBufferChange *cur_change;
1330 
1331  if (rbtxn_is_serialized(txn))
1332  {
1333  /* serialize remaining changes */
1334  ReorderBufferSerializeTXN(rb, txn);
1335  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1336  &state->entries[off].segno);
1337  }
1338 
1339  cur_change = dlist_head_element(ReorderBufferChange, node,
1340  &txn->changes);
1341 
1342  state->entries[off].lsn = cur_change->lsn;
1343  state->entries[off].change = cur_change;
1344  state->entries[off].txn = txn;
1345 
1347  }
1348 
1349  /* add subtransactions if they contain changes */
1350  dlist_foreach(cur_txn_i, &txn->subtxns)
1351  {
1352  ReorderBufferTXN *cur_txn;
1353 
1354  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1355 
1356  if (cur_txn->nentries > 0)
1357  {
1358  ReorderBufferChange *cur_change;
1359 
1360  if (rbtxn_is_serialized(cur_txn))
1361  {
1362  /* serialize remaining changes */
1363  ReorderBufferSerializeTXN(rb, cur_txn);
1364  ReorderBufferRestoreChanges(rb, cur_txn,
1365  &state->entries[off].file,
1366  &state->entries[off].segno);
1367  }
1368  cur_change = dlist_head_element(ReorderBufferChange, node,
1369  &cur_txn->changes);
1370 
1371  state->entries[off].lsn = cur_change->lsn;
1372  state->entries[off].change = cur_change;
1373  state->entries[off].txn = cur_txn;
1374 
1376  }
1377  }
1378 
1379  /* assemble a valid binary heap */
1380  binaryheap_build(state->heap);
1381 }
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:138
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:39
void binaryheap_add_unordered(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:116
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
static void AssertChangeLsnOrder(ReorderBufferTXN *txn)
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)

References AssertChangeLsnOrder(), binaryheap_add_unordered(), binaryheap_allocate(), binaryheap_build(), ReorderBufferTXN::changes, ReorderBuffer::context, dlist_iter::cur, dlist_container, dlist_foreach, dlist_head_element, dlist_init(), Int32GetDatum(), ReorderBufferChange::lsn, MemoryContextAllocZero(), ReorderBufferTXN::nentries, rbtxn_is_serialized, ReorderBufferIterCompare(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), and ReorderBufferTXN::subtxns.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferIterTXNNext()

static ReorderBufferChange * ReorderBufferIterTXNNext ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1390 of file reorderbuffer.c.

1391 {
1392  ReorderBufferChange *change;
1394  int32 off;
1395 
1396  /* nothing there anymore */
1397  if (state->heap->bh_size == 0)
1398  return NULL;
1399 
1400  off = DatumGetInt32(binaryheap_first(state->heap));
1401  entry = &state->entries[off];
1402 
1403  /* free memory we might have "leaked" in the previous *Next call */
1404  if (!dlist_is_empty(&state->old_change))
1405  {
1406  change = dlist_container(ReorderBufferChange, node,
1407  dlist_pop_head_node(&state->old_change));
1408  ReorderBufferReturnChange(rb, change, true);
1409  Assert(dlist_is_empty(&state->old_change));
1410  }
1411 
1412  change = entry->change;
1413 
1414  /*
1415  * update heap with information about which transaction has the next
1416  * relevant change in LSN order
1417  */
1418 
1419  /* there are in-memory changes */
1420  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1421  {
1422  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1423  ReorderBufferChange *next_change =
1425 
1426  /* txn stays the same */
1427  state->entries[off].lsn = next_change->lsn;
1428  state->entries[off].change = next_change;
1429 
1431  return change;
1432  }
1433 
1434  /* try to load changes from disk */
1435  if (entry->txn->nentries != entry->txn->nentries_mem)
1436  {
1437  /*
1438  * Ugly: restoring changes will reuse *Change records, thus delete the
1439  * current one from the per-tx list and only free in the next call.
1440  */
1441  dlist_delete(&change->node);
1442  dlist_push_tail(&state->old_change, &change->node);
1443 
1444  /*
1445  * Update the total bytes processed by the txn for which we are
1446  * releasing the current set of changes and restoring the new set of
1447  * changes.
1448  */
1449  rb->totalBytes += entry->txn->size;
1450  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1451  &state->entries[off].segno))
1452  {
1453  /* successfully restored changes from disk */
1454  ReorderBufferChange *next_change =
1456  &entry->txn->changes);
1457 
1458  elog(DEBUG2, "restored %u/%u changes from disk",
1459  (uint32) entry->txn->nentries_mem,
1460  (uint32) entry->txn->nentries);
1461 
1462  Assert(entry->txn->nentries_mem);
1463  /* txn stays the same */
1464  state->entries[off].lsn = next_change->lsn;
1465  state->entries[off].change = next_change;
1467 
1468  return change;
1469  }
1470  }
1471 
1472  /* ok, no changes there anymore, remove */
1474 
1475  return change;
1476 }
void binaryheap_replace_first(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:255
bh_node_type binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:177
bh_node_type binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:192
static int32 next
Definition: blutils.c:222
unsigned int uint32
Definition: c.h:506
static bool dlist_has_next(const dlist_head *head, const dlist_node *node)
Definition: ilist.h:503
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:537
ReorderBufferChange * change
ReorderBufferTXN * txn

References Assert, binaryheap_first(), binaryheap_remove_first(), binaryheap_replace_first(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, DatumGetInt32(), DEBUG2, dlist_container, dlist_delete(), dlist_has_next(), dlist_head_element, dlist_is_empty(), dlist_next_node(), dlist_pop_head_node(), dlist_push_tail(), elog, ReorderBufferIterTXNEntry::file, Int32GetDatum(), ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, next, ReorderBufferChange::node, ReorderBufferRestoreChanges(), ReorderBufferReturnChange(), ReorderBufferTXN::size, ReorderBuffer::totalBytes, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferLargestStreamableTopTXN()

static ReorderBufferTXN* ReorderBufferLargestStreamableTopTXN ( ReorderBuffer rb)
static

Definition at line 3593 of file reorderbuffer.c.

3594 {
3595  dlist_iter iter;
3596  Size largest_size = 0;
3597  ReorderBufferTXN *largest = NULL;
3598 
3599  /* Find the largest top-level transaction having a base snapshot. */
3601  {
3602  ReorderBufferTXN *txn;
3603 
3604  txn = dlist_container(ReorderBufferTXN, base_snapshot_node, iter.cur);
3605 
3606  /* must not be a subtxn */
3608  /* base_snapshot must be set */
3609  Assert(txn->base_snapshot != NULL);
3610 
3611  if ((largest == NULL || txn->total_size > largest_size) &&
3612  (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) &&
3614  {
3615  largest = txn;
3616  largest_size = txn->total_size;
3617  }
3618  }
3619 
3620  return largest;
3621 }
#define rbtxn_has_streamable_change(txn)
#define rbtxn_has_partial_change(txn)

References Assert, ReorderBufferTXN::base_snapshot, dlist_iter::cur, dlist_container, dlist_foreach, rbtxn_has_partial_change, rbtxn_has_streamable_change, rbtxn_is_known_subxact, ReorderBufferTXN::total_size, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferCheckMemoryLimit().

◆ ReorderBufferLargestTXN()

static ReorderBufferTXN* ReorderBufferLargestTXN ( ReorderBuffer rb)
static

Definition at line 3553 of file reorderbuffer.c.

3554 {
3555  ReorderBufferTXN *largest;
3556 
3557  /* Get the largest transaction from the max-heap */
3558  largest = pairingheap_container(ReorderBufferTXN, txn_node,
3560 
3561  Assert(largest);
3562  Assert(largest->size > 0);
3563  Assert(largest->size <= rb->size);
3564 
3565  return largest;
3566 }
pairingheap_node * pairingheap_first(pairingheap *heap)
Definition: pairingheap.c:144
#define pairingheap_container(type, membername, ptr)
Definition: pairingheap.h:43

References Assert, pairingheap_container, pairingheap_first(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBuffer::txn_heap.

Referenced by ReorderBufferCheckMemoryLimit().

◆ ReorderBufferPrepare()

void ReorderBufferPrepare ( ReorderBuffer rb,
TransactionId  xid,
char *  gid 
)

Definition at line 2809 of file reorderbuffer.c.

2811 {
2812  ReorderBufferTXN *txn;
2813 
2814  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2815  false);
2816 
2817  /* unknown transaction, nothing to replay */
2818  if (txn == NULL)
2819  return;
2820 
2821  txn->txn_flags |= RBTXN_PREPARE;
2822  txn->gid = pstrdup(gid);
2823 
2824  /* The prepare info must have been updated in txn by now. */
2826 
2827  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2828  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2829 
2830  /*
2831  * We send the prepare for the concurrently aborted xacts so that later
2832  * when rollback prepared is decoded and sent, the downstream should be
2833  * able to rollback such a xact. See comments atop DecodePrepare.
2834  *
2835  * Note, for the concurrent_abort + streaming case a stream_prepare was
2836  * already sent within the ReorderBufferReplay call above.
2837  */
2838  if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
2839  rb->prepare(rb, txn, txn->final_lsn);
2840 }
ReorderBufferPrepareCB prepare

References Assert, ReorderBufferTXN::concurrent_abort, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBuffer::prepare, ReorderBufferTXN::prepare_time, pstrdup(), rbtxn_is_streamed, RBTXN_PREPARE, ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodePrepare().

◆ ReorderBufferProcessPartialChange()

static void ReorderBufferProcessPartialChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  toast_insert 
)
static

Definition at line 719 of file reorderbuffer.c.

722 {
723  ReorderBufferTXN *toptxn;
724 
725  /*
726  * The partial changes need to be processed only while streaming
727  * in-progress transactions.
728  */
729  if (!ReorderBufferCanStream(rb))
730  return;
731 
732  /* Get the top transaction. */
733  toptxn = rbtxn_get_toptxn(txn);
734 
735  /*
736  * Indicate a partial change for toast inserts. The change will be
737  * considered as complete once we get the insert or update on the main
738  * table and we are sure that the pending toast chunks are not required
739  * anymore.
740  *
741  * If we allow streaming when there are pending toast chunks then such
742  * chunks won't be released till the insert (multi_insert) is complete and
743  * we expect the txn to have streamed all changes after streaming. This
744  * restriction is mainly to ensure the correctness of streamed
745  * transactions and it doesn't seem worth uplifting such a restriction
746  * just to allow this case because anyway we will stream the transaction
747  * once such an insert is complete.
748  */
749  if (toast_insert)
751  else if (rbtxn_has_partial_change(toptxn) &&
752  IsInsertOrUpdate(change->action) &&
753  change->data.tp.clear_toast_afterwards)
755 
756  /*
757  * Indicate a partial change for speculative inserts. The change will be
758  * considered as complete once we get the speculative confirm or abort
759  * token.
760  */
761  if (IsSpecInsert(change->action))
763  else if (rbtxn_has_partial_change(toptxn) &&
764  IsSpecConfirmOrAbort(change->action))
766 
767  /*
768  * Stream the transaction if it is serialized before and the changes are
769  * now complete in the top-level transaction.
770  *
771  * The reason for doing the streaming of such a transaction as soon as we
772  * get the complete change for it is that previously it would have reached
773  * the memory threshold and wouldn't get streamed because of incomplete
774  * changes. Delaying such transactions would increase apply lag for them.
775  */
777  !(rbtxn_has_partial_change(toptxn)) &&
778  rbtxn_is_serialized(txn) &&
780  ReorderBufferStreamTXN(rb, toptxn);
781 }
#define IsSpecInsert(action)
#define IsInsertOrUpdate(action)
#define IsSpecConfirmOrAbort(action)
#define RBTXN_HAS_PARTIAL_CHANGE

References ReorderBufferChange::action, ReorderBufferChange::data, IsInsertOrUpdate, IsSpecConfirmOrAbort, IsSpecInsert, rbtxn_get_toptxn, RBTXN_HAS_PARTIAL_CHANGE, rbtxn_has_partial_change, rbtxn_has_streamable_change, rbtxn_is_serialized, ReorderBufferCanStartStreaming(), ReorderBufferCanStream(), ReorderBufferStreamTXN(), ReorderBufferChange::tp, and ReorderBufferTXN::txn_flags.

Referenced by ReorderBufferQueueChange().

◆ ReorderBufferProcessTXN()

static void ReorderBufferProcessTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn,
volatile Snapshot  snapshot_now,
volatile CommandId  command_id,
bool  streaming 
)
static

Definition at line 2109 of file reorderbuffer.c.

2114 {
2115  bool using_subtxn;
2117  ReorderBufferIterTXNState *volatile iterstate = NULL;
2118  volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
2119  ReorderBufferChange *volatile specinsert = NULL;
2120  volatile bool stream_started = false;
2121  ReorderBufferTXN *volatile curtxn = NULL;
2122 
2123  /* build data to be able to lookup the CommandIds of catalog tuples */
2125 
2126  /* setup the initial snapshot */
2127  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2128 
2129  /*
2130  * Decoding needs access to syscaches et al., which in turn use
2131  * heavyweight locks and such. Thus we need to have enough state around to
2132  * keep track of those. The easiest way is to simply use a transaction
2133  * internally. That also allows us to easily enforce that nothing writes
2134  * to the database by checking for xid assignments.
2135  *
2136  * When we're called via the SQL SRF there's already a transaction
2137  * started, so start an explicit subtransaction there.
2138  */
2139  using_subtxn = IsTransactionOrTransactionBlock();
2140 
2141  PG_TRY();
2142  {
2143  ReorderBufferChange *change;
2144  int changes_count = 0; /* used to accumulate the number of
2145  * changes */
2146 
2147  if (using_subtxn)
2148  BeginInternalSubTransaction(streaming ? "stream" : "replay");
2149  else
2151 
2152  /*
2153  * We only need to send begin/begin-prepare for non-streamed
2154  * transactions.
2155  */
2156  if (!streaming)
2157  {
2158  if (rbtxn_prepared(txn))
2159  rb->begin_prepare(rb, txn);
2160  else
2161  rb->begin(rb, txn);
2162  }
2163 
2164  ReorderBufferIterTXNInit(rb, txn, &iterstate);
2165  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
2166  {
2167  Relation relation = NULL;
2168  Oid reloid;
2169 
2171 
2172  /*
2173  * We can't call start stream callback before processing first
2174  * change.
2175  */
2176  if (prev_lsn == InvalidXLogRecPtr)
2177  {
2178  if (streaming)
2179  {
2180  txn->origin_id = change->origin_id;
2181  rb->stream_start(rb, txn, change->lsn);
2182  stream_started = true;
2183  }
2184  }
2185 
2186  /*
2187  * Enforce correct ordering of changes, merged from multiple
2188  * subtransactions. The changes may have the same LSN due to
2189  * MULTI_INSERT xlog records.
2190  */
2191  Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
2192 
2193  prev_lsn = change->lsn;
2194 
2195  /*
2196  * Set the current xid to detect concurrent aborts. This is
2197  * required for the cases when we decode the changes before the
2198  * COMMIT record is processed.
2199  */
2200  if (streaming || rbtxn_prepared(change->txn))
2201  {
2202  curtxn = change->txn;
2203  SetupCheckXidLive(curtxn->xid);
2204  }
2205 
2206  switch (change->action)
2207  {
2209 
2210  /*
2211  * Confirmation for speculative insertion arrived. Simply
2212  * use as a normal record. It'll be cleaned up at the end
2213  * of INSERT processing.
2214  */
2215  if (specinsert == NULL)
2216  elog(ERROR, "invalid ordering of speculative insertion changes");
2217  Assert(specinsert->data.tp.oldtuple == NULL);
2218  change = specinsert;
2220 
2221  /* intentionally fall through */
2225  Assert(snapshot_now);
2226 
2227  reloid = RelidByRelfilenumber(change->data.tp.rlocator.spcOid,
2228  change->data.tp.rlocator.relNumber);
2229 
2230  /*
2231  * Mapped catalog tuple without data, emitted while
2232  * catalog table was in the process of being rewritten. We
2233  * can fail to look up the relfilenumber, because the
2234  * relmapper has no "historic" view, in contrast to the
2235  * normal catalog during decoding. Thus repeated rewrites
2236  * can cause a lookup failure. That's OK because we do not
2237  * decode catalog changes anyway. Normally such tuples
2238  * would be skipped over below, but we can't identify
2239  * whether the table should be logically logged without
2240  * mapping the relfilenumber to the oid.
2241  */
2242  if (reloid == InvalidOid &&
2243  change->data.tp.newtuple == NULL &&
2244  change->data.tp.oldtuple == NULL)
2245  goto change_done;
2246  else if (reloid == InvalidOid)
2247  elog(ERROR, "could not map filenumber \"%s\" to relation OID",
2248  relpathperm(change->data.tp.rlocator,
2249  MAIN_FORKNUM));
2250 
2251  relation = RelationIdGetRelation(reloid);
2252 
2253  if (!RelationIsValid(relation))
2254  elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
2255  reloid,
2256  relpathperm(change->data.tp.rlocator,
2257  MAIN_FORKNUM));
2258 
2259  if (!RelationIsLogicallyLogged(relation))
2260  goto change_done;
2261 
2262  /*
2263  * Ignore temporary heaps created during DDL unless the
2264  * plugin has asked for them.
2265  */
2266  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2267  goto change_done;
2268 
2269  /*
2270  * For now ignore sequence changes entirely. Most of the
2271  * time they don't log changes using records we
2272  * understand, so it doesn't make sense to handle the few
2273  * cases we do.
2274  */
2275  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2276  goto change_done;
2277 
2278  /* user-triggered change */
2279  if (!IsToastRelation(relation))
2280  {
2281  ReorderBufferToastReplace(rb, txn, relation, change);
2282  ReorderBufferApplyChange(rb, txn, relation, change,
2283  streaming);
2284 
2285  /*
2286  * Only clear reassembled toast chunks if we're sure
2287  * they're not required anymore. The creator of the
2288  * tuple tells us.
2289  */
2290  if (change->data.tp.clear_toast_afterwards)
2291  ReorderBufferToastReset(rb, txn);
2292  }
2293  /* we're not interested in toast deletions */
2294  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2295  {
2296  /*
2297  * Need to reassemble the full toasted Datum in
2298  * memory, to ensure the chunks don't get reused till
2299  * we're done remove it from the list of this
2300  * transaction's changes. Otherwise it will get
2301  * freed/reused while restoring spooled data from
2302  * disk.
2303  */
2304  Assert(change->data.tp.newtuple != NULL);
2305 
2306  dlist_delete(&change->node);
2307  ReorderBufferToastAppendChunk(rb, txn, relation,
2308  change);
2309  }
2310 
2311  change_done:
2312 
2313  /*
2314  * If speculative insertion was confirmed, the record
2315  * isn't needed anymore.
2316  */
2317  if (specinsert != NULL)
2318  {
2319  ReorderBufferReturnChange(rb, specinsert, true);
2320  specinsert = NULL;
2321  }
2322 
2323  if (RelationIsValid(relation))
2324  {
2325  RelationClose(relation);
2326  relation = NULL;
2327  }
2328  break;
2329 
2331 
2332  /*
2333  * Speculative insertions are dealt with by delaying the
2334  * processing of the insert until the confirmation record
2335  * arrives. For that we simply unlink the record from the
2336  * chain, so it does not get freed/reused while restoring
2337  * spooled data from disk.
2338  *
2339  * This is safe in the face of concurrent catalog changes
2340  * because the relevant relation can't be changed between
2341  * speculative insertion and confirmation due to
2342  * CheckTableNotInUse() and locking.
2343  */
2344 
2345  /* clear out a pending (and thus failed) speculation */
2346  if (specinsert != NULL)
2347  {
2348  ReorderBufferReturnChange(rb, specinsert, true);
2349  specinsert = NULL;
2350  }
2351 
2352  /* and memorize the pending insertion */
2353  dlist_delete(&change->node);
2354  specinsert = change;
2355  break;
2356 
2358 
2359  /*
2360  * Abort for speculative insertion arrived. So cleanup the
2361  * specinsert tuple and toast hash.
2362  *
2363  * Note that we get the spec abort change for each toast
2364  * entry but we need to perform the cleanup only the first
2365  * time we get it for the main table.
2366  */
2367  if (specinsert != NULL)
2368  {
2369  /*
2370  * We must clean the toast hash before processing a
2371  * completely new tuple to avoid confusion about the
2372  * previous tuple's toast chunks.
2373  */
2374  Assert(change->data.tp.clear_toast_afterwards);
2375  ReorderBufferToastReset(rb, txn);
2376 
2377  /* We don't need this record anymore. */
2378  ReorderBufferReturnChange(rb, specinsert, true);
2379  specinsert = NULL;
2380  }
2381  break;
2382 
2384  {
2385  int i;
2386  int nrelids = change->data.truncate.nrelids;
2387  int nrelations = 0;
2388  Relation *relations;
2389 
2390  relations = palloc0(nrelids * sizeof(Relation));
2391  for (i = 0; i < nrelids; i++)
2392  {
2393  Oid relid = change->data.truncate.relids[i];
2394  Relation rel;
2395 
2396  rel = RelationIdGetRelation(relid);
2397 
2398  if (!RelationIsValid(rel))
2399  elog(ERROR, "could not open relation with OID %u", relid);
2400 
2401  if (!RelationIsLogicallyLogged(rel))
2402  continue;
2403 
2404  relations[nrelations++] = rel;
2405  }
2406 
2407  /* Apply the truncate. */
2408  ReorderBufferApplyTruncate(rb, txn, nrelations,
2409  relations, change,
2410  streaming);
2411 
2412  for (i = 0; i < nrelations; i++)
2413  RelationClose(relations[i]);
2414 
2415  break;
2416  }
2417 
2419  ReorderBufferApplyMessage(rb, txn, change, streaming);
2420  break;
2421 
2423  /* Execute the invalidation messages locally */
2424  ReorderBufferExecuteInvalidations(change->data.inval.ninvalidations,
2425  change->data.inval.invalidations);
2426  break;
2427 
2429  /* get rid of the old */
2430  TeardownHistoricSnapshot(false);
2431 
2432  if (snapshot_now->copied)
2433  {
2434  ReorderBufferFreeSnap(rb, snapshot_now);
2435  snapshot_now =
2436  ReorderBufferCopySnap(rb, change->data.snapshot,
2437  txn, command_id);
2438  }
2439 
2440  /*
2441  * Restored from disk, need to be careful not to double
2442  * free. We could introduce refcounting for that, but for
2443  * now this seems infrequent enough not to care.
2444  */
2445  else if (change->data.snapshot->copied)
2446  {
2447  snapshot_now =
2448  ReorderBufferCopySnap(rb, change->data.snapshot,
2449  txn, command_id);
2450  }
2451  else
2452  {
2453  snapshot_now = change->data.snapshot;
2454  }
2455 
2456  /* and continue with the new one */
2457  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2458  break;
2459 
2461  Assert(change->data.command_id != InvalidCommandId);
2462 
2463  if (command_id < change->data.command_id)
2464  {
2465  command_id = change->data.command_id;
2466 
2467  if (!snapshot_now->copied)
2468  {
2469  /* we don't use the global one anymore */
2470  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2471  txn, command_id);
2472  }
2473 
2474  snapshot_now->curcid = command_id;
2475 
2476  TeardownHistoricSnapshot(false);
2477  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2478  }
2479 
2480  break;
2481 
2483  elog(ERROR, "tuplecid value in changequeue");
2484  break;
2485  }
2486 
2487  /*
2488  * It is possible that the data is not sent to downstream for a
2489  * long time either because the output plugin filtered it or there
2490  * is a DDL that generates a lot of data that is not processed by
2491  * the plugin. So, in such cases, the downstream can timeout. To
2492  * avoid that we try to send a keepalive message if required.
2493  * Trying to send a keepalive message after every change has some
2494  * overhead, but testing showed there is no noticeable overhead if
2495  * we do it after every ~100 changes.
2496  */
2497 #define CHANGES_THRESHOLD 100
2498 
2499  if (++changes_count >= CHANGES_THRESHOLD)
2500  {
2501  rb->update_progress_txn(rb, txn, change->lsn);
2502  changes_count = 0;
2503  }
2504  }
2505 
2506  /* speculative insertion record must be freed by now */
2507  Assert(!specinsert);
2508 
2509  /* clean up the iterator */
2510  ReorderBufferIterTXNFinish(rb, iterstate);
2511  iterstate = NULL;
2512 
2513  /*
2514  * Update total transaction count and total bytes processed by the
2515  * transaction and its subtransactions. Ensure to not count the
2516  * streamed transaction multiple times.
2517  *
2518  * Note that the statistics computation has to be done after
2519  * ReorderBufferIterTXNFinish as it releases the serialized change
2520  * which we have already accounted in ReorderBufferIterTXNNext.
2521  */
2522  if (!rbtxn_is_streamed(txn))
2523  rb->totalTxns++;
2524 
2525  rb->totalBytes += txn->total_size;
2526 
2527  /*
2528  * Done with current changes, send the last message for this set of
2529  * changes depending upon streaming mode.
2530  */
2531  if (streaming)
2532  {
2533  if (stream_started)
2534  {
2535  rb->stream_stop(rb, txn, prev_lsn);
2536  stream_started = false;
2537  }
2538  }
2539  else
2540  {
2541  /*
2542  * Call either PREPARE (for two-phase transactions) or COMMIT (for
2543  * regular ones).
2544  */
2545  if (rbtxn_prepared(txn))
2546  rb->prepare(rb, txn, commit_lsn);
2547  else
2548  rb->commit(rb, txn, commit_lsn);
2549  }
2550 
2551  /* this is just a sanity check against bad output plugin behaviour */
2553  elog(ERROR, "output plugin used XID %u",
2555 
2556  /*
2557  * Remember the command ID and snapshot for the next set of changes in
2558  * streaming mode.
2559  */
2560  if (streaming)
2561  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2562  else if (snapshot_now->copied)
2563  ReorderBufferFreeSnap(rb, snapshot_now);
2564 
2565  /* cleanup */
2566  TeardownHistoricSnapshot(false);
2567 
2568  /*
2569  * Aborting the current (sub-)transaction as a whole has the right
2570  * semantics. We want all locks acquired in here to be released, not
2571  * reassigned to the parent and we do not want any database access
2572  * have persistent effects.
2573  */
2575 
2576  /* make sure there's no cache pollution */
2578 
2579  if (using_subtxn)
2581 
2582  /*
2583  * We are here due to one of the four reasons: 1. Decoding an
2584  * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
2585  * prepared txn that was (partially) streamed. 4. Decoding a committed
2586  * txn.
2587  *
2588  * For 1, we allow truncation of txn data by removing the changes
2589  * already streamed but still keeping other things like invalidations,
2590  * snapshot, and tuplecids. For 2 and 3, we indicate
2591  * ReorderBufferTruncateTXN to do more elaborate truncation of txn
2592  * data as the entire transaction has been decoded except for commit.
2593  * For 4, as the entire txn has been decoded, we can fully clean up
2594  * the TXN reorder buffer.
2595  */
2596  if (streaming || rbtxn_prepared(txn))
2597  {
2599  /* Reset the CheckXidAlive */
2601  }
2602  else
2603  ReorderBufferCleanupTXN(rb, txn);
2604  }
2605  PG_CATCH();
2606  {
2607  MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
2608  ErrorData *errdata = CopyErrorData();
2609 
2610  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
2611  if (iterstate)
2612  ReorderBufferIterTXNFinish(rb, iterstate);
2613 
2615 
2616  /*
2617  * Force cache invalidation to happen outside of a valid transaction
2618  * to prevent catalog access as we just caught an error.
2619  */
2621 
2622  /* make sure there's no cache pollution */
2624  txn->invalidations);
2625 
2626  if (using_subtxn)
2628 
2629  /*
2630  * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2631  * abort of the (sub)transaction we are streaming or preparing. We
2632  * need to do the cleanup and return gracefully on this error, see
2633  * SetupCheckXidLive.
2634  *
2635  * This error code can be thrown by one of the callbacks we call
2636  * during decoding so we need to ensure that we return gracefully only
2637  * when we are sending the data in streaming mode and the streaming is
2638  * not finished yet or when we are sending the data out on a PREPARE
2639  * during a two-phase commit.
2640  */
2641  if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2642  (stream_started || rbtxn_prepared(txn)))
2643  {
2644  /* curtxn must be set for streaming or prepared transactions */
2645  Assert(curtxn);
2646 
2647  /* Cleanup the temporary error state. */
2648  FlushErrorState();
2649  FreeErrorData(errdata);
2650  errdata = NULL;
2651  curtxn->concurrent_abort = true;
2652 
2653  /* Reset the TXN so that it is allowed to stream remaining data. */
2654  ReorderBufferResetTXN(rb, txn, snapshot_now,
2655  command_id, prev_lsn,
2656  specinsert);
2657  }
2658  else
2659  {
2660  ReorderBufferCleanupTXN(rb, txn);
2661  MemoryContextSwitchTo(ecxt);
2662  PG_RE_THROW();
2663  }
2664  }
2665  PG_END_TRY();
2666 }
bool IsToastRelation(Relation relation)
Definition: catalog.c:175
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1818
void FlushErrorState(void)
Definition: elog.c:1867
ErrorData * CopyErrorData(void)
Definition: elog.c:1746
#define PG_RE_THROW()
Definition: elog.h:412
#define PG_TRY(...)
Definition: elog.h:371
#define PG_END_TRY(...)
Definition: elog.h:396
#define PG_CATCH(...)
Definition: elog.h:381
void * palloc0(Size size)
Definition: mcxt.c:1347
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
const void * data
#define InvalidOid
Definition: postgres_ext.h:36
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:701
#define RelationIsValid(relation)
Definition: rel.h:478
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2061
void RelationClose(Relation relation)
Definition: relcache.c:2192
Oid RelidByRelfilenumber(Oid reltablespace, RelFileNumber relfilenumber)
@ MAIN_FORKNUM
Definition: relpath.h:58
#define relpathperm(rlocator, forknum)
Definition: relpath.h:98
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
static void ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
#define CHANGES_THRESHOLD
static void ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
static void SetupCheckXidLive(TransactionId xid)
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
static void ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
#define rbtxn_prepared(txn)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:1665
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:1649
int sqlerrcode
Definition: elog.h:439
Form_pg_class rd_rel
Definition: rel.h:111
RepOriginId origin_id
Definition: reorderbuffer.h:86
ReorderBufferBeginCB begin_prepare
ReorderBufferUpdateProgressTxnCB update_progress_txn
ReorderBufferStreamStopCB stream_stop
ReorderBufferCommitCB commit
ReorderBufferStreamStartCB stream_start
ReorderBufferBeginCB begin
TransactionId CheckXidAlive
Definition: xact.c:98
void StartTransactionCommand(void)
Definition: xact.c:3039
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:470
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:453

References AbortCurrentTransaction(), ReorderBufferChange::action, Assert, ReorderBuffer::begin, ReorderBuffer::begin_prepare, BeginInternalSubTransaction(), CHANGES_THRESHOLD, CHECK_FOR_INTERRUPTS, CheckXidAlive, ReorderBufferChange::command_id, ReorderBuffer::commit, ReorderBufferTXN::concurrent_abort, SnapshotData::copied, CopyErrorData(), SnapshotData::curcid, CurrentMemoryContext, ReorderBufferChange::data, data, dlist_delete(), elog, ERROR, FlushErrorState(), FreeErrorData(), GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), i, ReorderBufferChange::inval, ReorderBufferTXN::invalidations, InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, ReorderBufferChange::node, ReorderBufferChange::origin_id, ReorderBufferTXN::origin_id, ReorderBuffer::output_rewrites, palloc0(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, ReorderBuffer::prepare, rbtxn_is_streamed, rbtxn_prepared, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelationIsValid, RelidByRelfilenumber(), relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferApplyChange(), ReorderBufferApplyMessage(), ReorderBufferApplyTruncate(), ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferResetTXN(), ReorderBufferReturnChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), RollbackAndReleaseCurrentSubTransaction(), SetupCheckXidLive(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, ErrorData::sqlerrcode, StartTransactionCommand(), ReorderBuffer::stream_start, ReorderBuffer::stream_stop, TeardownHistoricSnapshot(), ReorderBufferTXN::total_size, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTXN::tuplecid_hash, ReorderBufferChange::txn, ReorderBuffer::update_progress_txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferReplay(), and ReorderBufferStreamTXN().

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3131 of file reorderbuffer.c.

3132 {
3133  /* many records won't have an xid assigned, centralize check here */
3134  if (xid != InvalidTransactionId)
3135  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3136 }

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by heap2_decode(), heap_decode(), LogicalDecodingProcessRecord(), logicalmsg_decode(), standby_decode(), xact_decode(), and xlog_decode().

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change,
bool  toast_insert 
)

Definition at line 788 of file reorderbuffer.c.

790 {
791  ReorderBufferTXN *txn;
792 
793  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
794 
795  /*
796  * While streaming the previous changes we have detected that the
797  * transaction is aborted. So there is no point in collecting further
798  * changes for it.
799  */
800  if (txn->concurrent_abort)
801  {
802  /*
803  * We don't need to update memory accounting for this change as we
804  * have not added it to the queue yet.
805  */
806  ReorderBufferReturnChange(rb, change, false);
807  return;
808  }
809 
810  /*
811  * The changes that are sent downstream are considered streamable. We
812  * remember such transactions so that only those will later be considered
813  * for streaming.
814  */
815  if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
821  {
822  ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
823 
825  }
826 
827  change->lsn = lsn;
828  change->txn = txn;
829 
830  Assert(InvalidXLogRecPtr != lsn);
831  dlist_push_tail(&txn->changes, &change->node);
832  txn->nentries++;
833  txn->nentries_mem++;
834 
835  /* update memory accounting information */
836  ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
837  ReorderBufferChangeSize(change));
838 
839  /* process partial change */
840  ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
841 
842  /* check the memory limits and evict something if needed */
844 }
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
#define RBTXN_HAS_STREAMABLE_CHANGE

References ReorderBufferChange::action, Assert, ReorderBufferTXN::changes, ReorderBufferTXN::concurrent_abort, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, rbtxn_get_toptxn, RBTXN_HAS_STREAMABLE_CHANGE, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferCheckMemoryLimit(), ReorderBufferProcessPartialChange(), ReorderBufferReturnChange(), ReorderBufferTXNByXid(), ReorderBufferChange::txn, and ReorderBufferTXN::txn_flags.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddInvalidations(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snap,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 851 of file reorderbuffer.c.

855 {
856  if (transactional)
857  {
858  MemoryContext oldcontext;
859  ReorderBufferChange *change;
860 
862 
863  /*
864  * We don't expect snapshots for transactional changes - we'll use the
865  * snapshot derived later during apply (unless the change gets
866  * skipped).
867  */
868  Assert(!snap);
869 
870  oldcontext = MemoryContextSwitchTo(rb->context);
871 
872  change = ReorderBufferGetChange(rb);
874  change->data.msg.prefix = pstrdup(prefix);
875  change->data.msg.message_size = message_size;
876  change->data.msg.message = palloc(message_size);
877  memcpy(change->data.msg.message, message, message_size);
878 
879  ReorderBufferQueueChange(rb, xid, lsn, change, false);
880 
881  MemoryContextSwitchTo(oldcontext);
882  }
883  else
884  {
885  ReorderBufferTXN *txn = NULL;
886  volatile Snapshot snapshot_now = snap;
887 
888  /* Non-transactional changes require a valid snapshot. */
889  Assert(snapshot_now);
890 
891  if (xid != InvalidTransactionId)
892  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
893 
894  /* setup snapshot to allow catalog access */
895  SetupHistoricSnapshot(snapshot_now, NULL);
896  PG_TRY();
897  {
898  rb->message(rb, txn, lsn, false, prefix, message_size, message);
899 
901  }
902  PG_CATCH();
903  {
905  PG_RE_THROW();
906  }
907  PG_END_TRY();
908  }
909 }

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), and TeardownHistoricSnapshot().

Referenced by logicalmsg_decode().

◆ ReorderBufferRememberPrepareInfo()

bool ReorderBufferRememberPrepareInfo ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  prepare_lsn,
XLogRecPtr  end_lsn,
TimestampTz  prepare_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2762 of file reorderbuffer.c.

2766 {
2767  ReorderBufferTXN *txn;
2768 
2769  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2770 
2771  /* unknown transaction, nothing to do */
2772  if (txn == NULL)
2773  return false;
2774 
2775  /*
2776  * Remember the prepare information to be later used by commit prepared in
2777  * case we skip doing prepare.
2778  */
2779  txn->final_lsn = prepare_lsn;
2780  txn->end_lsn = end_lsn;
2781  txn->xact_time.prepare_time = prepare_time;
2782  txn->origin_id = origin_id;
2783  txn->origin_lsn = origin_lsn;
2784 
2785  return true;
2786 }

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, ReorderBufferTXNByXid(), and ReorderBufferTXN::xact_time.

Referenced by DecodePrepare().

◆ ReorderBufferReplay()

static void ReorderBufferReplay ( ReorderBufferTXN txn,
ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)
static

Definition at line 2679 of file reorderbuffer.c.

2684 {
2685  Snapshot snapshot_now;
2686  CommandId command_id = FirstCommandId;
2687 
2688  txn->final_lsn = commit_lsn;
2689  txn->end_lsn = end_lsn;
2690  txn->xact_time.commit_time = commit_time;
2691  txn->origin_id = origin_id;
2692  txn->origin_lsn = origin_lsn;
2693 
2694  /*
2695  * If the transaction was (partially) streamed, we need to commit it in a
2696  * 'streamed' way. That is, we first stream the remaining part of the
2697  * transaction, and then invoke stream_commit message.
2698  *
2699  * Called after everything (origin ID, LSN, ...) is stored in the
2700  * transaction to avoid passing that information directly.
2701  */
2702  if (rbtxn_is_streamed(txn))
2703  {
2704  ReorderBufferStreamCommit(rb, txn);
2705  return;
2706  }
2707 
2708  /*
2709  * If this transaction has no snapshot, it didn't make any changes to the
2710  * database, so there's nothing to decode. Note that
2711  * ReorderBufferCommitChild will have transferred any snapshots from
2712  * subtransactions if there were any.
2713  */
2714  if (txn->base_snapshot == NULL)
2715  {
2716  Assert(txn->ninvalidations == 0);
2717 
2718  /*
2719  * Removing this txn before a commit might result in the computation
2720  * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
2721  */
2722  if (!rbtxn_prepared(txn))
2723  ReorderBufferCleanupTXN(rb, txn);
2724  return;
2725  }
2726 
2727  snapshot_now = txn->base_snapshot;
2728 
2729  /* Process and send the changes to output plugin. */
2730  ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2731  command_id, false);
2732 }
#define FirstCommandId
Definition: c.h:659
uint32 CommandId
Definition: c.h:657
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, FirstCommandId, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, rbtxn_is_streamed, rbtxn_prepared, ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), ReorderBufferStreamCommit(), and ReorderBufferTXN::xact_time.

Referenced by ReorderBufferCommit(), ReorderBufferFinishPrepared(), and ReorderBufferPrepare().

◆ ReorderBufferResetTXN()

static void ReorderBufferResetTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id,
XLogRecPtr  last_lsn,
ReorderBufferChange specinsert 
)
static

Definition at line 2063 of file reorderbuffer.c.

2068 {
2069  /* Discard the changes that we just streamed */
2071 
2072  /* Free all resources allocated for toast reconstruction */
2073  ReorderBufferToastReset(rb, txn);
2074 
2075  /* Return the spec insert change if it is not NULL */
2076  if (specinsert != NULL)
2077  {
2078  ReorderBufferReturnChange(rb, specinsert, true);
2079  specinsert = NULL;
2080  }
2081 
2082  /*
2083  * For the streaming case, stop the stream and remember the command ID and
2084  * snapshot for the streaming run.
2085  */
2086  if (rbtxn_is_streamed(txn))
2087  {
2088  rb->stream_stop(rb, txn, last_lsn);
2089  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2090  }
2091 
2092  /* All changes must be deallocated */
2093  Assert(txn->size == 0);
2094 }

References Assert, rbtxn_is_streamed, rbtxn_prepared, ReorderBufferReturnChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), ReorderBufferTXN::size, and ReorderBuffer::stream_stop.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferRestoreChange()

static void ReorderBufferRestoreChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
char *  data 
)
static

Definition at line 4400 of file reorderbuffer.c.

4402 {
4403  ReorderBufferDiskChange *ondisk;
4404  ReorderBufferChange *change;
4405 
4406  ondisk = (ReorderBufferDiskChange *) data;
4407 
4408  change = ReorderBufferGetChange(rb);
4409 
4410  /* copy static part */
4411  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
4412 
4413  data += sizeof(ReorderBufferDiskChange);
4414 
4415  /* restore individual stuff */
4416  switch (change->action)
4417  {
4418  /* fall through these, they're all similar enough */
4423  if (change->data.tp.oldtuple)
4424  {
4425  uint32 tuplelen = ((HeapTuple) data)->t_len;
4426 
4427  change->data.tp.oldtuple =
4429 
4430  /* restore ->tuple */
4431  memcpy(change->data.tp.oldtuple, data,
4432  sizeof(HeapTupleData));
4433  data += sizeof(HeapTupleData);
4434 
4435  /* reset t_data pointer into the new tuplebuf */
4436  change->data.tp.oldtuple->t_data =
4437  (HeapTupleHeader) ((char *) change->data.tp.oldtuple + HEAPTUPLESIZE);
4438 
4439  /* restore tuple data itself */
4440  memcpy(change->data.tp.oldtuple->t_data, data, tuplelen);
4441  data += tuplelen;
4442  }
4443 
4444  if (change->data.tp.newtuple)
4445  {
4446  /* here, data might not be suitably aligned! */
4447  uint32 tuplelen;
4448 
4449  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
4450  sizeof(uint32));
4451 
4452  change->data.tp.newtuple =
4454 
4455  /* restore ->tuple */
4456  memcpy(change->data.tp.newtuple, data,
4457  sizeof(HeapTupleData));
4458  data += sizeof(HeapTupleData);
4459 
4460  /* reset t_data pointer into the new tuplebuf */
4461  change->data.tp.newtuple->t_data =
4462  (HeapTupleHeader) ((char *) change->data.tp.newtuple + HEAPTUPLESIZE);
4463 
4464  /* restore tuple data itself */
4465  memcpy(change->data.tp.newtuple->t_data, data, tuplelen);
4466  data += tuplelen;
4467  }
4468 
4469  break;
4471  {
4472  Size prefix_size;
4473 
4474  /* read prefix */
4475  memcpy(&prefix_size, data, sizeof(Size));
4476  data += sizeof(Size);
4477  change->data.msg.prefix = MemoryContextAlloc(rb->context,
4478  prefix_size);
4479  memcpy(change->data.msg.prefix, data, prefix_size);
4480  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
4481  data += prefix_size;
4482 
4483  /* read the message */
4484  memcpy(&change->data.msg.message_size, data, sizeof(Size));
4485  data += sizeof(Size);
4486  change->data.msg.message = MemoryContextAlloc(rb->context,
4487  change->data.msg.message_size);
4488  memcpy(change->data.msg.message, data,
4489  change->data.msg.message_size);
4490  data += change->data.msg.message_size;
4491 
4492  break;
4493  }
4495  {
4496  Size inval_size = sizeof(SharedInvalidationMessage) *
4497  change->data.inval.ninvalidations;
4498 
4499  change->data.inval.invalidations =
4500  MemoryContextAlloc(rb->context, inval_size);
4501 
4502  /* read the message */
4503  memcpy(change->data.inval.invalidations, data, inval_size);
4504 
4505  break;
4506  }
4508  {
4509  Snapshot oldsnap;
4510  Snapshot newsnap;
4511  Size size;
4512 
4513  oldsnap = (Snapshot) data;
4514 
4515  size = sizeof(SnapshotData) +
4516  sizeof(TransactionId) * oldsnap->xcnt +
4517  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
4518 
4520 
4521  newsnap = change->data.snapshot;
4522 
4523  memcpy(newsnap, data, size);
4524  newsnap->xip = (TransactionId *)
4525  (((char *) newsnap) + sizeof(SnapshotData));
4526  newsnap->subxip = newsnap->xip + newsnap->xcnt;
4527  newsnap->copied = true;
4528  break;
4529  }
4530  /* the base struct contains all the data, easy peasy */
4532  {
4533  Oid *relids;
4534 
4535  relids = ReorderBufferGetRelids(rb,
4536  change->data.truncate.nrelids);
4537  memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
4538  change->data.truncate.relids = relids;
4539 
4540  break;
4541  }
4546  break;
4547  }
4548 
4549  dlist_push_tail(&txn->changes, &change->node);
4550  txn->nentries_mem++;
4551 
4552  /*
4553  * Update memory accounting for the restored change. We need to do this
4554  * although we don't check the memory limit when restoring the changes in
4555  * this branch (we only do that when initially queueing the changes after
4556  * decoding), because we will release the changes later, and that will
4557  * update the accounting too (subtracting the size from the counters). And
4558  * we don't want to underflow there.
4559  */
4560  ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
4561  ReorderBufferChangeSize(change));
4562 }
struct ReorderBufferDiskChange ReorderBufferDiskChange
HeapTuple ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
Oid * ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
struct SnapshotData * Snapshot
Definition: snapshot.h:121
ReorderBufferChange change

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, ReorderBufferTXN::changes, ReorderBuffer::context, SnapshotData::copied, ReorderBufferChange::data, data, dlist_push_tail(), HEAPTUPLESIZE, ReorderBufferChange::inval, MemoryContextAlloc(), MemoryContextAllocZero(), ReorderBufferChange::msg, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferGetChange(), ReorderBufferGetRelids(), ReorderBufferGetTupleBuf(), size, SizeofHeapTupleHeader, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, ReorderBufferChange::tp, ReorderBufferChange::truncate, SnapshotData::xcnt, and SnapshotData::xip.

Referenced by ReorderBufferRestoreChanges().

◆ ReorderBufferRestoreChanges()

static Size ReorderBufferRestoreChanges ( ReorderBuffer rb,
ReorderBufferTXN txn,
TXNEntryFile file,
XLogSegNo segno 
)
static

Definition at line 4257 of file reorderbuffer.c.

4259 {
4260  Size restored = 0;
4261  XLogSegNo last_segno;
4262  dlist_mutable_iter cleanup_iter;
4263  File *fd = &file->vfd;
4264 
4267 
4268  /* free current entries, so we have memory for more */
4269  dlist_foreach_modify(cleanup_iter, &txn->changes)
4270  {
4272  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
4273 
4274  dlist_delete(&cleanup->node);
4276  }
4277  txn->nentries_mem = 0;
4278  Assert(dlist_is_empty(&txn->changes));
4279 
4280  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
4281 
4282  while (restored < max_changes_in_memory && *segno <= last_segno)
4283  {
4284  int readBytes;
4285  ReorderBufferDiskChange *ondisk;
4286 
4288 
4289  if (*fd == -1)
4290  {
4291  char path[MAXPGPATH];
4292 
4293  /* first time in */
4294  if (*segno == 0)
4295  XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
4296 
4297  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
4298 
4299  /*
4300  * No need to care about TLIs here, only used during a single run,
4301  * so each LSN only maps to a specific WAL record.
4302  */
4304  *segno);
4305 
4306  *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
4307 
4308  /* No harm in resetting the offset even in case of failure */
4309  file->curOffset = 0;
4310 
4311  if (*fd < 0 && errno == ENOENT)
4312  {
4313  *fd = -1;
4314  (*segno)++;
4315  continue;
4316  }
4317  else if (*fd < 0)
4318  ereport(ERROR,
4320  errmsg("could not open file \"%s\": %m",
4321  path)));
4322  }
4323 
4324  /*
4325  * Read the statically sized part of a change which has information
4326  * about the total size. If we couldn't read a record, we're at the
4327  * end of this file.
4328  */
4330  readBytes = FileRead(file->vfd, rb->outbuf,
4331  sizeof(ReorderBufferDiskChange),
4332  file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
4333 
4334  /* eof */
4335  if (readBytes == 0)
4336  {
4337  FileClose(*fd);
4338  *fd = -1;
4339  (*segno)++;
4340  continue;
4341  }
4342  else if (readBytes < 0)
4343  ereport(ERROR,
4345  errmsg("could not read from reorderbuffer spill file: %m")));
4346  else if (readBytes != sizeof(ReorderBufferDiskChange))
4347  ereport(ERROR,
4349  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4350  readBytes,
4351  (uint32) sizeof(ReorderBufferDiskChange))));
4352 
4353  file->curOffset += readBytes;
4354 
4355  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4356 
4358  sizeof(ReorderBufferDiskChange) + ondisk->size);
4359  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4360 
4361  readBytes = FileRead(file->vfd,
4362  rb->outbuf + sizeof(ReorderBufferDiskChange),
4363  ondisk->size - sizeof(ReorderBufferDiskChange),
4364  file->curOffset,
4365  WAIT_EVENT_REORDER_BUFFER_READ);
4366 
4367  if (readBytes < 0)
4368  ereport(ERROR,
4370  errmsg("could not read from reorderbuffer spill file: %m")));
4371  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
4372  ereport(ERROR,
4374  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4375  readBytes,
4376  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
4377 
4378  file->curOffset += readBytes;
4379 
4380  /*
4381  * ok, read a full change from disk, now restore it into proper
4382  * in-memory format
4383  */
4384  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
4385  restored++;
4386  }
4387 
4388  return restored;
4389 }
static void cleanup(void)
Definition: bootstrap.c:683
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1575
static ssize_t FileRead(File file, void *buffer, size_t amount, off_t offset, uint32 wait_event_info)
Definition: fd.h:196
int File
Definition: fd.h:51
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *data)
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
static const Size max_changes_in_memory
int wal_segment_size
Definition: xlog.c:142
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition: xlogdefs.h:48

References Assert, ReorderBufferTXN::changes, CHECK_FOR_INTERRUPTS, cleanup(), dlist_mutable_iter::cur, TXNEntryFile::curOffset, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FileClose(), FileRead(), ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, max_changes_in_memory, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBuffer::outbuf, PathNameOpenFile(), PG_BINARY, ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferSerializedPath(), ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, TXNEntryFile::vfd, wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferIterTXNInit(), and ReorderBufferIterTXNNext().

◆ ReorderBufferRestoreCleanup()

static void ReorderBufferRestoreCleanup ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 4568 of file reorderbuffer.c.

4569 {
4570  XLogSegNo first;
4571  XLogSegNo cur;
4572  XLogSegNo last;
4573 
4576 
4577  XLByteToSeg(txn->first_lsn, first, wal_segment_size);
4578  XLByteToSeg(txn->final_lsn, last, wal_segment_size);
4579 
4580  /* iterate over all possible filenames, and delete them */
4581  for (cur = first; cur <= last; cur++)
4582  {
4583  char path[MAXPGPATH];
4584 
4586  if (unlink(path) != 0 && errno != ENOENT)
4587  ereport(ERROR,
4589  errmsg("could not remove file \"%s\": %m", path)));
4590  }
4591 }
struct cursor * cur
Definition: ecpg.c:28

References Assert, cur, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, MAXPGPATH, MyReplicationSlot, ReorderBufferSerializedPath(), wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferCleanupTXN(), and ReorderBufferTruncateTXN().

◆ ReorderBufferReturnChange()

void ReorderBufferReturnChange ( ReorderBuffer rb,
ReorderBufferChange change,
bool  upd_mem 
)

Definition at line 500 of file reorderbuffer.c.

502 {
503  /* update memory accounting info */
504  if (upd_mem)
505  ReorderBufferChangeMemoryUpdate(rb, change, NULL, false,
506  ReorderBufferChangeSize(change));
507 
508  /* free contained data */
509  switch (change->action)
510  {
515  if (change->data.tp.newtuple)
516  {
517  ReorderBufferReturnTupleBuf(change->data.tp.newtuple);
518  change->data.tp.newtuple = NULL;
519  }
520 
521  if (change->data.tp.oldtuple)
522  {
523  ReorderBufferReturnTupleBuf(change->data.tp.oldtuple);
524  change->data.tp.oldtuple = NULL;
525  }
526  break;
528  if (change->data.msg.prefix != NULL)
529  pfree(change->data.msg.prefix);
530  change->data.msg.prefix = NULL;
531  if (change->data.msg.message != NULL)
532  pfree(change->data.msg.message);
533  change->data.msg.message = NULL;
534  break;
536  if (change->data.inval.invalidations)
537  pfree(change->data.inval.invalidations);
538  change->data.inval.invalidations = NULL;
539  break;
541  if (change->data.snapshot)
542  {
543  ReorderBufferFreeSnap(rb, change->data.snapshot);
544  change->data.snapshot = NULL;
545  }
546  break;
547  /* no data in addition to the struct itself */
549  if (change->data.truncate.relids != NULL)
550  {
551  ReorderBufferReturnRelids(rb, change->data.truncate.relids);
552  change->data.truncate.relids = NULL;
553  }
554  break;
559  break;
560  }
561 
562  pfree(change);
563 }
void ReorderBufferReturnTupleBuf(HeapTuple tuple)
void ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::msg, pfree(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferFreeSnap(), ReorderBufferReturnRelids(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, ReorderBufferChange::tp, and ReorderBufferChange::truncate.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferProcessTXN(), ReorderBufferQueueChange(), ReorderBufferResetTXN(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferToastReset(), and ReorderBufferTruncateTXN().

◆ ReorderBufferReturnRelids()

void ReorderBufferReturnRelids ( ReorderBuffer rb,
Oid relids 
)

Definition at line 619 of file reorderbuffer.c.

620 {
621  pfree(relids);
622 }

References pfree().

Referenced by ReorderBufferReturnChange().

◆ ReorderBufferReturnTupleBuf()

void ReorderBufferReturnTupleBuf ( HeapTuple  tuple)

Definition at line 588 of file reorderbuffer.c.

589 {
590  pfree(tuple);
591 }

References pfree().

Referenced by ReorderBufferReturnChange().

◆ ReorderBufferReturnTXN()

static void ReorderBufferReturnTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 443 of file reorderbuffer.c.

444 {
445  /* clean the lookup cache if we were cached (quite likely) */
446  if (rb->by_txn_last_xid == txn->xid)
447  {
449  rb->by_txn_last_txn = NULL;
450  }
451 
452  /* free data that's contained */
453 
454  if (txn->gid != NULL)
455  {
456  pfree(txn->gid);
457  txn->gid = NULL;
458  }
459 
460  if (txn->tuplecid_hash != NULL)
461  {
463  txn->tuplecid_hash = NULL;
464  }
465 
466  if (txn->invalidations)
467  {
468  pfree(txn->invalidations);
469  txn->invalidations = NULL;
470  }
471 
472  /* Reset the toast hash */
473  ReorderBufferToastReset(rb, txn);
474 
475  /* All changes must be deallocated */
476  Assert(txn->size == 0);
477 
478  pfree(txn);
479 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:865

References Assert, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBufferTXN::gid, hash_destroy(), ReorderBufferTXN::invalidations, InvalidTransactionId, pfree(), ReorderBufferToastReset(), ReorderBufferTXN::size, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::xid.

Referenced by ReorderBufferCleanupTXN().

◆ ReorderBufferSaveTXNSnapshot()

static void ReorderBufferSaveTXNSnapshot ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id 
)
inlinestatic

Definition at line 2042 of file reorderbuffer.c.

2044 {
2045  txn->command_id = command_id;
2046 
2047  /* Avoid copying if it's already copied. */
2048  if (snapshot_now->copied)
2049  txn->snapshot_now = snapshot_now;
2050  else
2051  txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2052  txn, command_id);
2053 }

References ReorderBufferTXN::command_id, SnapshotData::copied, ReorderBufferCopySnap(), and ReorderBufferTXN::snapshot_now.

Referenced by ReorderBufferProcessTXN(), and ReorderBufferResetTXN().

◆ ReorderBufferSerializeChange()

static void ReorderBufferSerializeChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  fd,
ReorderBufferChange change 
)
static

Definition at line 3805 of file reorderbuffer.c.

3807 {
3808  ReorderBufferDiskChange *ondisk;
3809  Size sz = sizeof(ReorderBufferDiskChange);
3810 
3812 
3813  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3814  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
3815 
3816  switch (change->action)
3817  {
3818  /* fall through these, they're all similar enough */
3823  {
3824  char *data;
3825  HeapTuple oldtup,
3826  newtup;
3827  Size oldlen = 0;
3828  Size newlen = 0;
3829 
3830  oldtup = change->data.tp.oldtuple;
3831  newtup = change->data.tp.newtuple;
3832 
3833  if (oldtup)
3834  {
3835  sz += sizeof(HeapTupleData);
3836  oldlen = oldtup->t_len;
3837  sz += oldlen;
3838  }
3839 
3840  if (newtup)
3841  {
3842  sz += sizeof(HeapTupleData);
3843  newlen = newtup->t_len;
3844  sz += newlen;
3845  }
3846 
3847  /* make sure we have enough space */
3849 
3850  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3851  /* might have been reallocated above */
3852  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3853 
3854  if (oldlen)
3855  {
3856  memcpy(data, oldtup, sizeof(HeapTupleData));
3857  data += sizeof(HeapTupleData);
3858 
3859  memcpy(data, oldtup->t_data, oldlen);
3860  data += oldlen;
3861  }
3862 
3863  if (newlen)
3864  {
3865  memcpy(data, newtup, sizeof(HeapTupleData));
3866  data += sizeof(HeapTupleData);
3867 
3868  memcpy(data, newtup->t_data, newlen);
3869  data += newlen;
3870  }
3871  break;
3872  }
3874  {
3875  char *data;
3876  Size prefix_size = strlen(change->data.msg.prefix) + 1;
3877 
3878  sz += prefix_size + change->data.msg.message_size +
3879  sizeof(Size) + sizeof(Size);
3881 
3882  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3883 
3884  /* might have been reallocated above */
3885  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3886 
3887  /* write the prefix including the size */
3888  memcpy(data, &prefix_size, sizeof(Size));
3889  data += sizeof(Size);
3890  memcpy(data, change->data.msg.prefix,
3891  prefix_size);
3892  data += prefix_size;
3893 
3894  /* write the message including the size */
3895  memcpy(data, &change->data.msg.message_size, sizeof(Size));
3896  data += sizeof(Size);
3897  memcpy(data, change->data.msg.message,
3898  change->data.msg.message_size);
3899  data += change->data.msg.message_size;
3900 
3901  break;
3902  }
3904  {
3905  char *data;
3906  Size inval_size = sizeof(SharedInvalidationMessage) *
3907  change->data.inval.ninvalidations;
3908 
3909  sz += inval_size;
3910 
3912  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3913 
3914  /* might have been reallocated above */
3915  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3916  memcpy(data, change->data.inval.invalidations, inval_size);
3917  data += inval_size;
3918 
3919  break;
3920  }
3922  {
3923  Snapshot snap;
3924  char *data;
3925 
3926  snap = change->data.snapshot;
3927 
3928  sz += sizeof(SnapshotData) +
3929  sizeof(TransactionId) * snap->xcnt +
3930  sizeof(TransactionId) * snap->subxcnt;
3931 
3932  /* make sure we have enough space */
3934  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3935  /* might have been reallocated above */
3936  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3937 
3938  memcpy(data, snap, sizeof(SnapshotData));
3939  data += sizeof(SnapshotData);
3940 
3941  if (snap->xcnt)
3942  {
3943  memcpy(data, snap->xip,
3944  sizeof(TransactionId) * snap->xcnt);
3945  data += sizeof(TransactionId) * snap->xcnt;
3946  }
3947 
3948  if (snap->subxcnt)
3949  {
3950  memcpy(data, snap->subxip,
3951  sizeof(TransactionId) * snap->subxcnt);
3952  data += sizeof(TransactionId) * snap->subxcnt;
3953  }
3954  break;
3955  }
3957  {
3958  Size size;
3959  char *data;
3960 
3961  /* account for the OIDs of truncated relations */
3962  size = sizeof(Oid) * change->data.truncate.nrelids;
3963  sz += size;
3964 
3965  /* make sure we have enough space */
3967 
3968  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3969  /* might have been reallocated above */
3970  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3971 
3972  memcpy(data, change->data.truncate.relids, size);
3973  data += size;
3974 
3975  break;
3976  }
3981  /* ReorderBufferChange contains everything important */
3982  break;
3983  }
3984 
3985  ondisk->size = sz;
3986 
3987  errno = 0;
3988  pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
3989  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
3990  {
3991  int save_errno = errno;
3992 
3994 
3995  /* if write didn't set errno, assume problem is no disk space */
3996  errno = save_errno ? save_errno : ENOSPC;
3997  ereport(ERROR,
3999  errmsg("could not write to data file for XID %u: %m",
4000  txn->xid)));
4001  }
4003 
4004  /*
4005  * Keep the transaction's final_lsn up to date with each change we send to
4006  * disk, so that ReorderBufferRestoreCleanup works correctly. (We used to
4007  * only do this on commit and abort records, but that doesn't work if a
4008  * system crash leaves a transaction without its abort record).
4009  *
4010  * Make sure not to move it backwards.
4011  */
4012  if (txn->final_lsn < change->lsn)
4013  txn->final_lsn = change->lsn;
4014 
4015  Assert(ondisk->change.action == change->action);
4016 }
#define write(a, b, c)
Definition: win32.h:14
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, CloseTransientFile(), ReorderBufferChange::data, data, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferTXN::final_lsn, if(), ReorderBufferChange::inval, ReorderBufferChange::lsn, ReorderBufferChange::msg, ReorderBuffer::outbuf, pgstat_report_wait_end(), pgstat_report_wait_start(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, size, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, write, SnapshotData::xcnt, ReorderBufferTXN::xid, and SnapshotData::xip.

Referenced by ReorderBufferSerializeTXN().

◆ ReorderBufferSerializedPath()

static void ReorderBufferSerializedPath ( char *  path,
ReplicationSlot slot,
TransactionId  xid,
XLogSegNo  segno 
)
static

Definition at line 4637 of file reorderbuffer.c.

4639 {
4640  XLogRecPtr recptr;
4641 
4642  XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
4643 
4644  snprintf(path, MAXPGPATH, "%s/%s/xid-%u-lsn-%X-%X.spill",
4647  xid, LSN_FORMAT_ARGS(recptr));
4648 }
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43

References ReplicationSlot::data, LSN_FORMAT_ARGS, MAXPGPATH, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, PG_REPLSLOT_DIR, snprintf, wal_segment_size, and XLogSegNoOffsetToRecPtr.

Referenced by ReorderBufferRestoreChanges(), ReorderBufferRestoreCleanup(), and ReorderBufferSerializeTXN().

◆ ReorderBufferSerializeReserve()

static void ReorderBufferSerializeReserve ( ReorderBuffer rb,
Size  sz 
)
static

Definition at line 3520 of file reorderbuffer.c.

3521 {
3522  if (!rb->outbufsize)
3523  {
3524  rb->outbuf = MemoryContextAlloc(rb->context, sz);
3525  rb->outbufsize = sz;
3526  }
3527  else if (rb->outbufsize < sz)
3528  {
3529  rb->outbuf = repalloc(rb->outbuf, sz);
3530  rb->outbufsize = sz;
3531  }
3532 }

References ReorderBuffer::context, MemoryContextAlloc(), ReorderBuffer::outbuf, ReorderBuffer::outbufsize, and repalloc().

Referenced by ReorderBufferRestoreChanges(), and ReorderBufferSerializeChange().

◆ ReorderBufferSerializeTXN()

static void ReorderBufferSerializeTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 3710 of file reorderbuffer.c.

3711 {
3712  dlist_iter subtxn_i;
3713  dlist_mutable_iter change_i;
3714  int fd = -1;
3715  XLogSegNo curOpenSegNo = 0;
3716  Size spilled = 0;
3717  Size size = txn->size;
3718 
3719  elog(DEBUG2, "spill %u changes in XID %u to disk",
3720  (uint32) txn->nentries_mem, txn->xid);
3721 
3722  /* do the same to all child TXs */
3723  dlist_foreach(subtxn_i, &txn->subtxns)
3724  {
3725  ReorderBufferTXN *subtxn;
3726 
3727  subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
3728  ReorderBufferSerializeTXN(rb, subtxn);
3729  }
3730 
3731  /* serialize changestream */
3732  dlist_foreach_modify(change_i, &txn->changes)
3733  {
3734  ReorderBufferChange *change;
3735 
3736  change = dlist_container(ReorderBufferChange, node, change_i.cur);
3737 
3738  /*
3739  * store in segment in which it belongs by start lsn, don't split over
3740  * multiple segments tho
3741  */
3742  if (fd == -1 ||
3743  !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
3744  {
3745  char path[MAXPGPATH];
3746 
3747  if (fd != -1)
3749 
3750  XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
3751 
3752  /*
3753  * No need to care about TLIs here, only used during a single run,
3754  * so each LSN only maps to a specific WAL record.
3755  */
3757  curOpenSegNo);
3758 
3759  /* open segment, create it if necessary */
3760  fd = OpenTransientFile(path,
3761  O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
3762 
3763  if (fd < 0)
3764  ereport(ERROR,
3766  errmsg("could not open file \"%s\": %m", path)));
3767  }
3768 
3769  ReorderBufferSerializeChange(rb, txn, fd, change);
3770  dlist_delete(&change->node);
3771  ReorderBufferReturnChange(rb, change, false);
3772 
3773  spilled++;
3774  }
3775 
3776  /* Update the memory counter */
3777  ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, size);
3778 
3779  /* update the statistics iff we have spilled anything */
3780  if (spilled)
3781  {
3782  rb->spillCount += 1;
3783  rb->spillBytes += size;
3784 
3785  /* don't consider already serialized transactions */
3786  rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
3787 
3788  /* update the decoding stats */
3790  }
3791 
3792  Assert(spilled == txn->nentries_mem);
3793  Assert(dlist_is_empty(&txn->changes));
3794  txn->nentries_mem = 0;
3796 
3797  if (fd != -1)
3799 }
void UpdateDecodingStats(LogicalDecodingContext *ctx)
Definition: logical.c:1932
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
#define rbtxn_is_serialized_clear(txn)
#define RBTXN_IS_SERIALIZED
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)

References Assert, ReorderBufferTXN::changes, CloseTransientFile(), dlist_iter::cur, dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_delete(), dlist_foreach, dlist_foreach_modify, dlist_is_empty(), elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferChange::lsn, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, OpenTransientFile(), PG_BINARY, ReorderBuffer::private_data, RBTXN_IS_SERIALIZED, rbtxn_is_serialized, rbtxn_is_serialized_clear, ReorderBufferChangeMemoryUpdate(), ReorderBufferReturnChange(), ReorderBufferSerializeChange(), ReorderBufferSerializedPath(), size, ReorderBufferTXN::size, ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBufferTXN::subtxns, ReorderBufferTXN::txn_flags, UpdateDecodingStats(), wal_segment_size, ReorderBufferTXN::xid, XLByteInSeg, and XLByteToSeg.

Referenced by ReorderBufferCheckMemoryLimit(), and ReorderBufferIterTXNInit().

◆ ReorderBufferSetBaseSnapshot()

void ReorderBufferSetBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 3162 of file reorderbuffer.c.

3164 {
3165  ReorderBufferTXN *txn;
3166  bool is_new;
3167 
3168  Assert(snap != NULL);
3169 
3170  /*
3171  * Fetch the transaction to operate on. If we know it's a subtransaction,
3172  * operate on its top-level transaction instead.
3173  */
3174  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
3175  if (rbtxn_is_known_subxact(txn))
3176  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3177  NULL, InvalidXLogRecPtr, false);
3178  Assert(txn->base_snapshot == NULL);
3179 
3180  txn->base_snapshot = snap;
3181  txn->base_snapshot_lsn = lsn;
3183 
3184  AssertTXNLsnOrder(rb);
3185 }

References Assert, AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBufferTXN::base_snapshot_node, dlist_push_tail(), InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), ReorderBufferTXN::toplevel_xid, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by SnapBuildCommitTxn(), and SnapBuildProcessChange().

◆ ReorderBufferSetRestartPoint()

void ReorderBufferSetRestartPoint ( ReorderBuffer rb,
XLogRecPtr  ptr 
)

Definition at line 1065 of file reorderbuffer.c.

1066 {
1067  rb->current_restart_decoding_lsn = ptr;
1068 }

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

◆ ReorderBufferSkipPrepare()

void ReorderBufferSkipPrepare ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 2790 of file reorderbuffer.c.

2791 {
2792  ReorderBufferTXN *txn;
2793 
2794  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2795 
2796  /* unknown transaction, nothing to do */
2797  if (txn == NULL)
2798  return;
2799 
2801 }
#define RBTXN_SKIPPED_PREPARE

References InvalidXLogRecPtr, RBTXN_SKIPPED_PREPARE, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by DecodePrepare().

◆ ReorderBufferStreamCommit()

static void ReorderBufferStreamCommit ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1907 of file reorderbuffer.c.

1908 {
1909  /* we should only call this for previously streamed transactions */
1910  Assert(rbtxn_is_streamed(txn));
1911 
1912  ReorderBufferStreamTXN(rb, txn);
1913 
1914  if (rbtxn_prepared(txn))
1915  {
1916  /*
1917  * Note, we send stream prepare even if a concurrent abort is
1918  * detected. See DecodePrepare for more information.
1919  */
1920  rb->stream_prepare(rb, txn, txn->final_lsn);
1921 
1922  /*
1923  * This is a PREPARED transaction, part of a two-phase commit. The
1924  * full cleanup will happen as part of the COMMIT PREPAREDs, so now
1925  * just truncate txn by removing changes and tuplecids.
1926  */
1927  ReorderBufferTruncateTXN(rb, txn, true);
1928  /* Reset the CheckXidAlive */
1930  }
1931  else
1932  {
1933  rb->stream_commit(rb, txn, txn->final_lsn);
1934  ReorderBufferCleanupTXN(rb, txn);
1935  }
1936 }
ReorderBufferStreamPrepareCB stream_prepare
ReorderBufferStreamCommitCB stream_commit

References Assert, CheckXidAlive, ReorderBufferTXN::final_lsn, InvalidTransactionId, rbtxn_is_streamed, rbtxn_prepared, ReorderBufferCleanupTXN(), ReorderBufferStreamTXN(), ReorderBufferTruncateTXN(), ReorderBuffer::stream_commit, and ReorderBuffer::stream_prepare.

Referenced by ReorderBufferReplay().

◆ ReorderBufferStreamTXN()

static void ReorderBufferStreamTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 4055 of file reorderbuffer.c.

4056 {
4057  Snapshot snapshot_now;
4058  CommandId command_id;
4059  Size stream_bytes;
4060  bool txn_is_streamed;
4061 
4062  /* We can never reach here for a subtransaction. */
4063  Assert(rbtxn_is_toptxn(txn));
4064 
4065  /*
4066  * We can't make any assumptions about base snapshot here, similar to what
4067  * ReorderBufferCommit() does. That relies on base_snapshot getting
4068  * transferred from subxact in ReorderBufferCommitChild(), but that was
4069  * not yet called as the transaction is in-progress.
4070  *
4071  * So just walk the subxacts and use the same logic here. But we only need
4072  * to do that once, when the transaction is streamed for the first time.
4073  * After that we need to reuse the snapshot from the previous run.
4074  *
4075  * Unlike DecodeCommit which adds xids of all the subtransactions in
4076  * snapshot's xip array via SnapBuildCommitTxn, we can't do that here but
4077  * we do add them to subxip array instead via ReorderBufferCopySnap. This
4078  * allows the catalog changes made in subtransactions decoded till now to
4079  * be visible.
4080  */
4081  if (txn->snapshot_now == NULL)
4082  {
4083  dlist_iter subxact_i;
4084 
4085  /* make sure this transaction is streamed for the first time */
4086  Assert(!rbtxn_is_streamed(txn));
4087 
4088  /* at the beginning we should have invalid command ID */
4090 
4091  dlist_foreach(subxact_i, &txn->subtxns)
4092  {
4093  ReorderBufferTXN *subtxn;
4094 
4095  subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
4096  ReorderBufferTransferSnapToParent(txn, subtxn);
4097  }
4098 
4099  /*
4100  * If this transaction has no snapshot, it didn't make any changes to
4101  * the database till now, so there's nothing to decode.
4102  */
4103  if (txn->base_snapshot == NULL)
4104  {
4105  Assert(txn->ninvalidations == 0);
4106  return;
4107  }
4108 
4109  command_id = FirstCommandId;
4110  snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
4111  txn, command_id);
4112  }
4113  else
4114  {
4115  /* the transaction must have been already streamed */
4116  Assert(rbtxn_is_streamed(txn));
4117 
4118  /*
4119  * Nah, we already have snapshot from the previous streaming run. We
4120  * assume new subxacts can't move the LSN backwards, and so can't beat
4121  * the LSN condition in the previous branch (so no need to walk
4122  * through subxacts again). In fact, we must not do that as we may be
4123  * using snapshot half-way through the subxact.
4124  */
4125  command_id = txn->command_id;
4126 
4127  /*
4128  * We can't use txn->snapshot_now directly because after the last
4129  * streaming run, we might have got some new sub-transactions. So we
4130  * need to add them to the snapshot.
4131  */
4132  snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now,
4133  txn, command_id);
4134 
4135  /* Free the previously copied snapshot. */
4136  Assert(txn->snapshot_now->copied);
4138  txn->snapshot_now = NULL;
4139  }
4140 
4141  /*
4142  * Remember this information to be used later to update stats. We can't
4143  * update the stats here as an error while processing the changes would
4144  * lead to the accumulation of stats even though we haven't streamed all
4145  * the changes.
4146  */
4147  txn_is_streamed = rbtxn_is_streamed(txn);
4148  stream_bytes = txn->total_size;
4149 
4150  /* Process and send the changes to output plugin. */
4151  ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
4152  command_id, true);
4153 
4154  rb->streamCount += 1;
4155  rb->streamBytes += stream_bytes;
4156 
4157  /* Don't consider already streamed transaction. */
4158  rb->streamTxns += (txn_is_streamed) ? 0 : 1;
4159 
4160  /* update the decoding stats */
4162 
4163  Assert(dlist_is_empty(&txn->changes));
4164  Assert(txn->nentries == 0);
4165  Assert(txn->nentries_mem == 0);
4166 }

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::changes, ReorderBufferTXN::command_id, SnapshotData::copied, dlist_iter::cur, dlist_container, dlist_foreach, dlist_is_empty(), FirstCommandId, InvalidCommandId, InvalidXLogRecPtr, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferTXN::ninvalidations, ReorderBuffer::private_data, rbtxn_is_streamed, rbtxn_is_toptxn, ReorderBufferCopySnap(), ReorderBufferFreeSnap(), ReorderBufferProcessTXN(), ReorderBufferTransferSnapToParent(), ReorderBufferTXN::snapshot_now, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBufferTXN::subtxns, ReorderBufferTXN::total_size, and UpdateDecodingStats().

Referenced by ReorderBufferCheckMemoryLimit(), ReorderBufferProcessPartialChange(), and ReorderBufferStreamCommit().

◆ ReorderBufferToastAppendChunk()

static void ReorderBufferToastAppendChunk ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 4709 of file reorderbuffer.c.

4711 {
4712  ReorderBufferToastEnt *ent;
4713  HeapTuple newtup;
4714  bool found;
4715  int32 chunksize;
4716  bool isnull;
4717  Pointer chunk;
4718  TupleDesc desc = RelationGetDescr(relation);
4719  Oid chunk_id;
4720  int32 chunk_seq;
4721 
4722  if (txn->toast_hash == NULL)
4723  ReorderBufferToastInitHash(rb, txn);
4724 
4725  Assert(IsToastRelation(relation));
4726 
4727  newtup = change->data.tp.newtuple;
4728  chunk_id = DatumGetObjectId(fastgetattr(newtup, 1, desc, &isnull));
4729  Assert(!isnull);
4730  chunk_seq = DatumGetInt32(fastgetattr(newtup, 2, desc, &isnull));
4731  Assert(!isnull);
4732 
4733  ent = (ReorderBufferToastEnt *)
4734  hash_search(txn->toast_hash, &chunk_id, HASH_ENTER, &found);
4735 
4736  if (!found)
4737  {
4738  Assert(ent->chunk_id == chunk_id);
4739  ent->num_chunks = 0;
4740  ent->last_chunk_seq = 0;
4741  ent->size = 0;
4742  ent->reconstructed = NULL;
4743  dlist_init(&ent->chunks);
4744 
4745  if (chunk_seq != 0)
4746  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
4747  chunk_seq, chunk_id);
4748  }
4749  else if (found && chunk_seq != ent->last_chunk_seq + 1)
4750  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
4751  chunk_seq, chunk_id, ent->last_chunk_seq + 1);
4752 
4753  chunk = DatumGetPointer(fastgetattr(newtup, 3, desc, &isnull));
4754  Assert(!isnull);
4755 
4756  /* calculate size so we can allocate the right size at once later */
4757  if (!VARATT_IS_EXTENDED(chunk))
4758  chunksize = VARSIZE(chunk) - VARHDRSZ;
4759  else if (VARATT_IS_SHORT(chunk))
4760  /* could happen due to heap_form_tuple doing its thing */
4761  chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
4762  else
4763  elog(ERROR, "unexpected type of toast chunk");
4764 
4765  ent->size += chunksize;
4766  ent->last_chunk_seq = chunk_seq;
4767  ent->num_chunks++;
4768  dlist_push_tail(&ent->chunks, &change->node);
4769 }
char * Pointer
Definition: c.h:486
#define VARHDRSZ
Definition: c.h:683
uint64 chunk
static Datum fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)
Definition: htup_details.h:749
static Oid DatumGetObjectId(Datum X)
Definition: postgres.h:242
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:312
#define RelationGetDescr(relation)
Definition: rel.h:531
static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
struct varlena * reconstructed
#define VARHDRSZ_SHORT
Definition: varatt.h:255
#define VARSIZE_SHORT(PTR)
Definition: varatt.h:281
#define VARATT_IS_EXTENDED(PTR)
Definition: varatt.h:303
#define VARATT_IS_SHORT(PTR)
Definition: varatt.h:302
#define VARSIZE(PTR)
Definition: varatt.h:279

References Assert, chunk, ReorderBufferToastEnt::chunk_id, ReorderBufferToastEnt::chunks, ReorderBufferChange::data, DatumGetInt32(), DatumGetObjectId(), DatumGetPointer(), dlist_init(), dlist_push_tail(), elog, ERROR, fastgetattr(), HASH_ENTER, hash_search(), IsToastRelation(), ReorderBufferToastEnt::last_chunk_seq, ReorderBufferChange::node, ReorderBufferToastEnt::num_chunks, ReorderBufferToastEnt::reconstructed, RelationGetDescr, ReorderBufferToastInitHash(), ReorderBufferToastEnt::size, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, VARATT_IS_EXTENDED, VARATT_IS_SHORT, VARHDRSZ, VARHDRSZ_SHORT, VARSIZE, and VARSIZE_SHORT.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferToastInitHash()

static void ReorderBufferToastInitHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 4689 of file reorderbuffer.c.

4690 {
4691  HASHCTL hash_ctl;
4692 
4693  Assert(txn->toast_hash == NULL);
4694 
4695  hash_ctl.keysize = sizeof(Oid);
4696  hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
4697  hash_ctl.hcxt = rb->context;
4698  txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
4700 }
struct ReorderBufferToastEnt ReorderBufferToastEnt

References Assert, ReorderBuffer::context, HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, and ReorderBufferTXN::toast_hash.

Referenced by ReorderBufferToastAppendChunk().

◆ ReorderBufferToastReplace()

static void ReorderBufferToastReplace ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 4792 of file reorderbuffer.c.

4794 {
4795  TupleDesc desc;
4796  int natt;
4797  Datum *attrs;
4798  bool *isnull;
4799  bool *free;
4800  HeapTuple tmphtup;
4801  Relation toast_rel;
4802  TupleDesc toast_desc;
4803  MemoryContext oldcontext;
4804  HeapTuple newtup;
4805  Size old_size;
4806 
4807  /* no toast tuples changed */
4808  if (txn->toast_hash == NULL)
4809  return;
4810 
4811  /*
4812  * We're going to modify the size of the change. So, to make sure the
4813  * accounting is correct we record the current change size and then after
4814  * re-computing the change we'll subtract the recorded size and then
4815  * re-add the new change size at the end. We don't immediately subtract
4816  * the old size because if there is any error before we add the new size,
4817  * we will release the changes and that will update the accounting info
4818  * (subtracting the size from the counters). And we don't want to
4819  * underflow there.
4820  */
4821  old_size = ReorderBufferChangeSize(change);
4822 
4823  oldcontext = MemoryContextSwitchTo(rb->context);
4824 
4825  /* we should only have toast tuples in an INSERT or UPDATE */
4826  Assert(change->data.tp.newtuple);
4827 
4828  desc = RelationGetDescr(relation);
4829 
4830  toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
4831  if (!RelationIsValid(toast_rel))
4832  elog(ERROR, "could not open toast relation with OID %u (base relation \"%s\")",
4833  relation->rd_rel->reltoastrelid, RelationGetRelationName(relation));
4834 
4835  toast_desc = RelationGetDescr(toast_rel);
4836 
4837  /* should we allocate from stack instead? */
4838  attrs = palloc0(sizeof(Datum) * desc->natts);
4839  isnull = palloc0(sizeof(bool) * desc->natts);
4840  free = palloc0(sizeof(bool) * desc->natts);
4841 
4842  newtup = change->data.tp.newtuple;
4843 
4844  heap_deform_tuple(newtup, desc, attrs, isnull);
4845 
4846  for (natt = 0; natt < desc->natts; natt++)
4847  {
4848  Form_pg_attribute attr = TupleDescAttr(desc, natt);
4849  ReorderBufferToastEnt *ent;
4850  struct varlena *varlena;
4851 
4852  /* va_rawsize is the size of the original datum -- including header */
4853  struct varatt_external toast_pointer;
4854  struct varatt_indirect redirect_pointer;
4855  struct varlena *new_datum = NULL;
4856  struct varlena *reconstructed;
4857  dlist_iter it;
4858  Size data_done = 0;
4859 
4860  /* system columns aren't toasted */
4861  if (attr->attnum < 0)
4862  continue;
4863 
4864  if (attr->attisdropped)
4865  continue;
4866 
4867  /* not a varlena datatype */
4868  if (attr->attlen != -1)
4869  continue;
4870 
4871  /* no data */
4872  if (isnull[natt])
4873  continue;
4874 
4875  /* ok, we know we have a toast datum */
4876  varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
4877 
4878  /* no need to do anything if the tuple isn't external */
4880  continue;
4881 
4882  VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
4883 
4884  /*
4885  * Check whether the toast tuple changed, replace if so.
4886  */
4887  ent = (ReorderBufferToastEnt *)
4888  hash_search(txn->toast_hash,
4889  &toast_pointer.va_valueid,
4890  HASH_FIND,
4891  NULL);
4892  if (ent == NULL)
4893  continue;
4894 
4895  new_datum =
4897 
4898  free[natt] = true;
4899 
4900  reconstructed = palloc0(toast_pointer.va_rawsize);
4901 
4902  ent->reconstructed = reconstructed;
4903 
4904  /* stitch toast tuple back together from its parts */
4905  dlist_foreach(it, &ent->chunks)
4906  {
4907  bool cisnull;
4908  ReorderBufferChange *cchange;
4909  HeapTuple ctup;
4910  Pointer chunk;
4911 
4912  cchange = dlist_container(ReorderBufferChange, node, it.cur);
4913  ctup = cchange->data.tp.newtuple;
4914  chunk = DatumGetPointer(fastgetattr(ctup, 3, toast_desc, &cisnull));
4915 
4916  Assert(!cisnull);
4919 
4920  memcpy(VARDATA(reconstructed) + data_done,
4921  VARDATA(chunk),
4922  VARSIZE(chunk) - VARHDRSZ);
4923  data_done += VARSIZE(chunk) - VARHDRSZ;
4924  }
4925  Assert(data_done == VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer));
4926 
4927  /* make sure its marked as compressed or not */
4928  if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
4929  SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
4930  else
4931  SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
4932 
4933  memset(&redirect_pointer, 0, sizeof(redirect_pointer));
4934  redirect_pointer.pointer = reconstructed;
4935 
4937  memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
4938  sizeof(redirect_pointer));
4939 
4940  attrs[natt] = PointerGetDatum(new_datum);
4941  }
4942 
4943  /*
4944  * Build tuple in separate memory & copy tuple back into the tuplebuf
4945  * passed to the output plugin. We can't directly heap_fill_tuple() into
4946  * the tuplebuf because attrs[] will point back into the current content.
4947  */
4948  tmphtup = heap_form_tuple(desc, attrs, isnull);
4949  Assert(newtup->t_len <= MaxHeapTupleSize);
4950  Assert(newtup->t_data == (HeapTupleHeader) ((char *) newtup + HEAPTUPLESIZE));
4951 
4952  memcpy(newtup->t_data, tmphtup->t_data, tmphtup->t_len);
4953  newtup->t_len = tmphtup->t_len;
4954 
4955  /*
4956  * free resources we won't further need, more persistent stuff will be
4957  * free'd in ReorderBufferToastReset().
4958  */
4959  RelationClose(toast_rel);
4960  pfree(tmphtup);
4961  for (natt = 0; natt < desc->natts; natt++)
4962  {
4963  if (free[natt])
4964  pfree(DatumGetPointer(attrs[natt]));
4965  }
4966  pfree(attrs);
4967  pfree(free);
4968  pfree(isnull);
4969 
4970  MemoryContextSwitchTo(oldcontext);
4971 
4972  /* subtract the old change size */
4973  ReorderBufferChangeMemoryUpdate(rb, change, NULL, false, old_size);
4974  /* now add the change back, with the correct size */
4975  ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
4976  ReorderBufferChangeSize(change));
4977 }
#define INDIRECT_POINTER_SIZE
Definition: detoast.h:34
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
Definition: detoast.h:22
#define free(a)
Definition: header.h:65
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1116
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:1345
#define MaxHeapTupleSize
Definition: htup_details.h:558
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:209
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
uintptr_t Datum
Definition: postgres.h:64
#define RelationGetRelationName(relation)
Definition: rel.h:539
Definition: c.h:678
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define SET_VARSIZE_COMPRESSED(PTR, len)
Definition: varatt.h:307
#define VARDATA(PTR)
Definition: varatt.h:278
#define SET_VARTAG_EXTERNAL(PTR, tag)
Definition: varatt.h:309
#define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)
Definition: varatt.h:354
#define VARDATA_EXTERNAL(PTR)
Definition: varatt.h:286
#define SET_VARSIZE(PTR, len)
Definition: varatt.h:305
#define VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer)
Definition: varatt.h:334
#define VARATT_IS_EXTERNAL(PTR)
Definition: varatt.h:289
@ VARTAG_INDIRECT
Definition: varatt.h:86

References Assert, chunk, ReorderBufferToastEnt::chunks, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, DatumGetPointer(), dlist_container, dlist_foreach, elog, ERROR, fastgetattr(), free, HASH_FIND, hash_search(), heap_deform_tuple(), heap_form_tuple(), HEAPTUPLESIZE, INDIRECT_POINTER_SIZE, MaxHeapTupleSize, MemoryContextSwitchTo(), TupleDescData::natts, palloc0(), pfree(), varatt_indirect::pointer, PointerGetDatum(), RelationData::rd_rel, ReorderBufferToastEnt::reconstructed, RelationClose(), RelationGetDescr, RelationGetRelationName, RelationIdGetRelation(), RelationIsValid, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), SET_VARSIZE, SET_VARSIZE_COMPRESSED, SET_VARTAG_EXTERNAL, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, TupleDescAttr, varatt_external::va_rawsize, varatt_external::va_valueid, VARATT_EXTERNAL_GET_EXTSIZE, VARATT_EXTERNAL_GET_POINTER, VARATT_EXTERNAL_IS_COMPRESSED, VARATT_IS_EXTERNAL, VARATT_IS_SHORT, VARDATA, VARDATA_EXTERNAL, VARHDRSZ, VARSIZE, and VARTAG_INDIRECT.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferToastReset()

static void ReorderBufferToastReset ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 4983 of file reorderbuffer.c.

4984 {
4985  HASH_SEQ_STATUS hstat;
4986  ReorderBufferToastEnt *ent;
4987 
4988  if (txn->toast_hash == NULL)
4989  return;
4990 
4991  /* sequentially walk over the hash and free everything */
4992  hash_seq_init(&hstat, txn->toast_hash);
4993  while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
4994  {
4995  dlist_mutable_iter it;
4996 
4997  if (ent->reconstructed != NULL)
4998  pfree(ent->reconstructed);
4999 
5000  dlist_foreach_modify(it, &ent->chunks)
5001  {
5002  ReorderBufferChange *change =
5004 
5005  dlist_delete(&change->node);
5006  ReorderBufferReturnChange(rb, change, true);
5007  }
5008  }
5009 
5010  hash_destroy(txn->toast_hash);
5011  txn->toast_hash = NULL;
5012 }
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1420
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1385

References ReorderBufferToastEnt::chunks, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, hash_destroy(), hash_seq_init(), hash_seq_search(), ReorderBufferChange::node, pfree(), ReorderBufferToastEnt::reconstructed, ReorderBufferReturnChange(), and ReorderBufferTXN::toast_hash.

Referenced by ReorderBufferProcessTXN(), ReorderBufferResetTXN(), and ReorderBufferReturnTXN().

◆ ReorderBufferTransferSnapToParent()

static void ReorderBufferTransferSnapToParent ( ReorderBufferTXN txn,
ReorderBufferTXN subtxn 
)
static

Definition at line 1143 of file reorderbuffer.c.

1145 {
1146  Assert(subtxn->toplevel_xid == txn->xid);
1147 
1148  if (subtxn->base_snapshot != NULL)
1149  {
1150  if (txn->base_snapshot == NULL ||
1151  subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
1152  {
1153  /*
1154  * If the toplevel transaction already has a base snapshot but
1155  * it's newer than the subxact's, purge it.
1156  */
1157  if (txn->base_snapshot != NULL)
1158  {
1161  }
1162 
1163  /*
1164  * The snapshot is now the top transaction's; transfer it, and
1165  * adjust the list position of the top transaction in the list by
1166  * moving it to where the subtransaction is.
1167  */
1168  txn->base_snapshot = subtxn->base_snapshot;
1169  txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
1171  &txn->base_snapshot_node);
1172 
1173  /*
1174  * The subtransaction doesn't have a snapshot anymore (so it
1175  * mustn't be in the list.)
1176  */
1177  subtxn->base_snapshot = NULL;
1179  dlist_delete(&subtxn->base_snapshot_node);
1180  }
1181  else
1182  {
1183  /* Base snap of toplevel is fine, so subxact's is not needed */
1185  dlist_delete(&subtxn->base_snapshot_node);
1186  subtxn->base_snapshot = NULL;
1188  }
1189  }
1190 }
static void dlist_insert_before(dlist_node *before, dlist_node *node)
Definition: ilist.h:393

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBufferTXN::base_snapshot_node, dlist_delete(), dlist_insert_before(), InvalidXLogRecPtr, SnapBuildSnapDecRefcount(), ReorderBufferTXN::toplevel_xid, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAssignChild(), and ReorderBufferStreamTXN().

◆ ReorderBufferTruncateTXN()

static void ReorderBufferTruncateTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
bool  txn_prepared 
)
static

Definition at line 1633 of file reorderbuffer.c.

1634 {
1635  dlist_mutable_iter iter;
1636  Size mem_freed = 0;
1637 
1638  /* cleanup subtransactions & their changes */
1639  dlist_foreach_modify(iter, &txn->subtxns)
1640  {
1641  ReorderBufferTXN *subtxn;
1642 
1643  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1644 
1645  /*
1646  * Subtransactions are always associated to the toplevel TXN, even if
1647  * they originally were happening inside another subtxn, so we won't
1648  * ever recurse more than one level deep here.
1649  */
1650  Assert(rbtxn_is_known_subxact(subtxn));
1651  Assert(subtxn->nsubtxns == 0);
1652 
1653  ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
1654  }
1655 
1656  /* cleanup changes in the txn */
1657  dlist_foreach_modify(iter, &txn->changes)
1658  {
1659  ReorderBufferChange *change;
1660 
1661  change = dlist_container(ReorderBufferChange, node, iter.cur);
1662 
1663  /* Check we're not mixing changes from different transactions. */
1664  Assert(change->txn == txn);
1665 
1666  /* remove the change from it's containing list */
1667  dlist_delete(&change->node);
1668 
1669  /*
1670  * Instead of updating the memory counter for individual changes, we
1671  * sum up the size of memory to free so we can update the memory
1672  * counter all together below. This saves costs of maintaining the
1673  * max-heap.
1674  */
1675  mem_freed += ReorderBufferChangeSize(change);
1676 
1677  ReorderBufferReturnChange(rb, change, false);
1678  }
1679 
1680  /* Update the memory counter */
1681  ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
1682 
1683  /*
1684  * Mark the transaction as streamed.
1685  *
1686  * The top-level transaction, is marked as streamed always, even if it
1687  * does not contain any changes (that is, when all the changes are in
1688  * subtransactions).
1689  *
1690  * For subtransactions, we only mark them as streamed when there are
1691  * changes in them.
1692  *
1693  * We do it this way because of aborts - we don't want to send aborts for
1694  * XIDs the downstream is not aware of. And of course, it always knows
1695  * about the toplevel xact (we send the XID in all messages), but we never
1696  * stream XIDs of empty subxacts.
1697  */
1698  if ((!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0)))
1699  txn->txn_flags |= RBTXN_IS_STREAMED;
1700 
1701  if (txn_prepared)
1702  {
1703  /*
1704  * If this is a prepared txn, cleanup the tuplecids we stored for
1705  * decoding catalog snapshot access. They are always stored in the
1706  * toplevel transaction.
1707  */
1708  dlist_foreach_modify(iter, &txn->tuplecids)
1709  {
1710  ReorderBufferChange *change;
1711 
1712  change = dlist_container(ReorderBufferChange, node, iter.cur);
1713 
1714  /* Check we're not mixing changes from different transactions. */
1715  Assert(change->txn == txn);
1717 
1718  /* Remove the change from its containing list. */
1719  dlist_delete(&change->node);
1720 
1721  ReorderBufferReturnChange(rb, change, true);
1722  }
1723  }
1724 
1725  /*
1726  * Destroy the (relfilelocator, ctid) hashtable, so that we don't leak any
1727  * memory. We could also keep the hash table and update it with new ctid
1728  * values, but this seems simpler and good enough for now.
1729  */
1730  if (txn->tuplecid_hash != NULL)
1731  {
1733  txn->tuplecid_hash = NULL;
1734  }
1735 
1736  /* If this txn is serialized then clean the disk space. */
1737  if (rbtxn_is_serialized(txn))
1738  {
1739  ReorderBufferRestoreCleanup(rb, txn);
1740  txn->txn_flags &= ~RBTXN_IS_SERIALIZED;
1741 
1742  /*
1743  * We set this flag to indicate if the transaction is ever serialized.
1744  * We need this to accurately update the stats as otherwise the same
1745  * transaction can be counted as serialized multiple times.
1746  */
1748  }
1749 
1750  /* also reset the number of entries in the transaction */
1751  txn->nentries_mem = 0;
1752  txn->nentries = 0;
1753 }
#define RBTXN_IS_STREAMED
#define RBTXN_IS_SERIALIZED_CLEAR

References ReorderBufferChange::action, Assert, ReorderBufferTXN::changes, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, hash_destroy(), ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SERIALIZED, rbtxn_is_serialized, RBTXN_IS_SERIALIZED_CLEAR, RBTXN_IS_STREAMED, rbtxn_is_toptxn, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferRestoreCleanup(), ReorderBufferReturnChange(), ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecid_hash, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXN::txn_flags.

Referenced by ReorderBufferProcessTXN(), ReorderBufferResetTXN(), and ReorderBufferStreamCommit().

◆ ReorderBufferTXNByXid()

static ReorderBufferTXN * ReorderBufferTXNByXid ( ReorderBuffer rb,
TransactionId  xid,
bool  create,
bool is_new,
XLogRecPtr  lsn,
bool  create_as_top 
)
static

Definition at line 631 of file reorderbuffer.c.

633 {
634  ReorderBufferTXN *txn;
636  bool found;
637 
639 
640  /*
641  * Check the one-entry lookup cache first
642  */
644  rb->by_txn_last_xid == xid)
645  {
646  txn = rb->by_txn_last_txn;
647 
648  if (txn != NULL)
649  {
650  /* found it, and it's valid */
651  if (is_new)
652  *is_new = false;
653  return txn;
654  }
655 
656  /*
657  * cached as non-existent, and asked not to create? Then nothing else
658  * to do.
659  */
660  if (!create)
661  return NULL;
662  /* otherwise fall through to create it */
663  }
664 
665  /*
666  * If the cache wasn't hit or it yielded a "does-not-exist" and we want to
667  * create an entry.
668  */
669 
670  /* search the lookup table */
671  ent = (ReorderBufferTXNByIdEnt *)
672  hash_search(rb->by_txn,
673  &xid,
674  create ? HASH_ENTER : HASH_FIND,
675  &found);
676  if (found)
677  txn = ent->txn;
678  else if (create)
679  {
680  /* initialize the new entry, if creation was requested */
681  Assert(ent != NULL);
682  Assert(lsn != InvalidXLogRecPtr);
683 
684  ent->txn = ReorderBufferGetTXN(rb);
685  ent->txn->xid = xid;
686  txn = ent->txn;
687  txn->first_lsn = lsn;
689 
690  if (create_as_top)
691  {
692  dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
693  AssertTXNLsnOrder(rb);
694  }
695  }
696  else
697  txn = NULL; /* not found and not asked to create */
698 
699  /* update cache */
700  rb->by_txn_last_xid = xid;
701  rb->by_txn_last_txn = txn;
702 
703  if (is_new)
704  *is_new = !found;
705 
706  Assert(!create || txn != NULL);
707  return txn;
708 }
static ReorderBufferTXN * ReorderBufferGetTXN(ReorderBuffer *rb)
ReorderBufferTXN * txn
XLogRecPtr restart_decoding_lsn
#define TransactionIdIsValid(xid)
Definition: transam.h:41

References Assert, AssertTXNLsnOrder(), ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::current_restart_decoding_lsn, dlist_push_tail(), ReorderBufferTXN::first_lsn, HASH_ENTER, HASH_FIND, hash_search(), InvalidXLogRecPtr, ReorderBufferTXN::node, ReorderBufferGetTXN(), ReorderBufferTXN::restart_decoding_lsn, ReorderBuffer::toplevel_by_lsn, TransactionIdIsValid, ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAddInvalidations(), ReorderBufferAddNewTupleCids(), ReorderBufferAssignChild(), ReorderBufferCommit(), ReorderBufferCommitChild(), ReorderBufferFinishPrepared(), ReorderBufferForget(), ReorderBufferInvalidate(), ReorderBufferPrepare(), ReorderBufferProcessXid(), ReorderBufferQueueChange(), ReorderBufferQueueMessage(), ReorderBufferRememberPrepareInfo(), ReorderBufferSetBaseSnapshot(), ReorderBufferSkipPrepare(), ReorderBufferXidHasBaseSnapshot(), ReorderBufferXidHasCatalogChanges(), and ReorderBufferXidSetCatalogChanges().

◆ ReorderBufferTXNSizeCompare()

static int ReorderBufferTXNSizeCompare ( const pairingheap_node a,
const pairingheap_node b,
void *  arg 
)
static

Definition at line 3537 of file reorderbuffer.c.

3538 {
3541 
3542  if (ta->size < tb->size)
3543  return -1;
3544  if (ta->size > tb->size)
3545  return 1;
3546  return 0;
3547 }
#define pairingheap_const_container(type, membername, ptr)
Definition: pairingheap.h:51

References a, b, pairingheap_const_container, and ReorderBufferTXN::size.

Referenced by ReorderBufferAllocate().

◆ ReorderBufferXidHasBaseSnapshot()

bool ReorderBufferXidHasBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 3490 of file reorderbuffer.c.

3491 {
3492  ReorderBufferTXN *txn;
3493 
3494  txn = ReorderBufferTXNByXid(rb, xid, false,
3495  NULL, InvalidXLogRecPtr, false);
3496 
3497  /* transaction isn't known yet, ergo no snapshot */
3498  if (txn == NULL)
3499  return false;
3500 
3501  /* a known subtxn? operate on top-level txn instead */
3502  if (rbtxn_is_known_subxact(txn))
3503  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3504  NULL, InvalidXLogRecPtr, false);
3505 
3506  return txn->base_snapshot != NULL;
3507 }

References ReorderBufferTXN::base_snapshot, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), and ReorderBufferTXN::toplevel_xid.

Referenced by SnapBuildCommitTxn(), SnapBuildDistributeNewCatalogSnapshot(), and SnapBuildProcessChange().

◆ ReorderBufferXidHasCatalogChanges()

bool ReorderBufferXidHasCatalogChanges ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 3473 of file reorderbuffer.c.

3474 {
3475  ReorderBufferTXN *txn;
3476 
3477  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3478  false);
3479  if (txn == NULL)
3480  return false;
3481 
3482  return rbtxn_has_catalog_changes(txn);
3483 }

References InvalidXLogRecPtr, rbtxn_has_catalog_changes, and ReorderBufferTXNByXid().

Referenced by SnapBuildXidHasCatalogChanges().

◆ ReorderBufferXidSetCatalogChanges()

void ReorderBufferXidSetCatalogChanges ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3400 of file reorderbuffer.c.

3402 {
3403  ReorderBufferTXN *txn;
3404 
3405  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3406 
3407  if (!rbtxn_has_catalog_changes(txn))
3408  {
3411  }
3412 
3413  /*
3414  * Mark top-level transaction as having catalog changes too if one of its
3415  * children has so that the ReorderBufferBuildTupleCidHash can
3416  * conveniently check just top-level transaction and decide whether to
3417  * build the hash table or not.
3418  */
3419  if (rbtxn_is_subtxn(txn))
3420  {
3421  ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
3422 
3423  if (!rbtxn_has_catalog_changes(toptxn))
3424  {
3427  }
3428  }
3429 }
static void dclist_push_tail(dclist_head *head, dlist_node *node)
Definition: ilist.h:709
#define rbtxn_is_subtxn(txn)
#define RBTXN_HAS_CATALOG_CHANGES

References ReorderBufferTXN::catchange_node, ReorderBuffer::catchange_txns, dclist_push_tail(), rbtxn_get_toptxn, RBTXN_HAS_CATALOG_CHANGES, rbtxn_has_catalog_changes, rbtxn_is_subtxn, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by heap_decode(), SnapBuildProcessNewCid(), and xact_decode().

◆ ResolveCminCmaxDuringDecoding()

bool ResolveCminCmaxDuringDecoding ( HTAB tuplecid_data,
Snapshot  snapshot,
HeapTuple  htup,
Buffer  buffer,
CommandId cmin,
CommandId cmax 
)

Definition at line 5275 of file reorderbuffer.c.

5279 {
5282  ForkNumber forkno;
5283  BlockNumber blockno;
5284  bool updated_mapping = false;
5285 
5286  /*
5287  * Return unresolved if tuplecid_data is not valid. That's because when
5288  * streaming in-progress transactions we may run into tuples with the CID
5289  * before actually decoding them. Think e.g. about INSERT followed by
5290  * TRUNCATE, where the TRUNCATE may not be decoded yet when applying the
5291  * INSERT. So in such cases, we assume the CID is from the future
5292  * command.
5293  */
5294  if (tuplecid_data == NULL)
5295  return false;
5296 
5297  /* be careful about padding */
5298  memset(&key, 0, sizeof(key));
5299 
5300  Assert(!BufferIsLocal(buffer));
5301 
5302  /*
5303  * get relfilelocator from the buffer, no convenient way to access it
5304  * other than that.
5305  */
5306  BufferGetTag(buffer, &key.rlocator, &forkno, &blockno);
5307 
5308  /* tuples can only be in the main fork */
5309  Assert(forkno == MAIN_FORKNUM);
5310  Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
5311 
5312  ItemPointerCopy(&htup->t_self,
5313  &key.tid);
5314 
5315 restart:
5316  ent = (ReorderBufferTupleCidEnt *)
5318 
5319  /*
5320  * failed to find a mapping, check whether the table was rewritten and
5321  * apply mapping if so, but only do that once - there can be no new
5322  * mappings while we are in here since we have to hold a lock on the
5323  * relation.
5324  */
5325  if (ent == NULL && !updated_mapping)
5326  {
5328  /* now check but don't update for a mapping again */
5329  updated_mapping = true;
5330  goto restart;
5331  }
5332  else if (ent == NULL)
5333  return false;
5334 
5335  if (cmin)
5336  *cmin = ent->cmin;
5337  if (cmax)
5338  *cmax = ent->cmax;
5339  return true;
5340 }
uint32 BlockNumber
Definition: block.h:31
#define BufferIsLocal(buffer)
Definition: buf.h:37
void BufferGetTag(Buffer buffer, RelFileLocator *rlocator, ForkNumber *forknum, BlockNumber *blknum)
Definition: bufmgr.c:3745
static BlockNumber ItemPointerGetBlockNumber(const ItemPointerData *pointer)
Definition: itemptr.h:103
ForkNumber
Definition: relpath.h:56
static void UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
ItemPointerData t_self
Definition: htup.h:65
Oid t_tableOid
Definition: htup.h:66

References Assert, BufferGetTag(), BufferIsLocal, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, HASH_FIND, hash_search(), ItemPointerCopy(), ItemPointerGetBlockNumber(), sort-test::key, MAIN_FORKNUM, HeapTupleData::t_self, HeapTupleData::t_tableOid, tuplecid_data, and UpdateLogicalMappings().

Referenced by HeapTupleSatisfiesHistoricMVCC().

◆ SetupCheckXidLive()

static void SetupCheckXidLive ( TransactionId  xid)
inlinestatic

Definition at line 1971 of file reorderbuffer.c.

1972 {
1973  /*
1974  * If the input transaction id is already set as a CheckXidAlive then
1975  * nothing to do.
1976  */
1978  return;
1979 
1980  /*
1981  * setup CheckXidAlive if it's not committed yet. We don't check if the
1982  * xid is aborted. That will happen during catalog access.
1983  */
1984  if (!TransactionIdDidCommit(xid))
1985  CheckXidAlive = xid;
1986  else
1988 }
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:126
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43

References CheckXidAlive, InvalidTransactionId, TransactionIdDidCommit(), and TransactionIdEquals.

Referenced by ReorderBufferProcessTXN().

◆ StartupReorderBuffer()

void StartupReorderBuffer ( void  )

Definition at line 4655 of file reorderbuffer.c.

4656 {
4657  DIR *logical_dir;
4658  struct dirent *logical_de;
4659 
4660  logical_dir = AllocateDir(PG_REPLSLOT_DIR);
4661  while ((logical_de = ReadDir(logical_dir, PG_REPLSLOT_DIR)) != NULL)
4662  {
4663  if (strcmp(logical_de->d_name, ".") == 0 ||
4664  strcmp(logical_de->d_name, "..") == 0)
4665  continue;
4666 
4667  /* if it cannot be a slot, skip the directory */
4668  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
4669  continue;
4670 
4671  /*
4672  * ok, has to be a surviving logical slot, iterate and delete
4673  * everything starting with xid-*
4674  */
4676  }
4677  FreeDir(logical_dir);
4678 }
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2932
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:252

References AllocateDir(), dirent::d_name, DEBUG2, FreeDir(), PG_REPLSLOT_DIR, ReadDir(), ReorderBufferCleanupSerializedTXNs(), and ReplicationSlotValidateName().

Referenced by StartupXLOG().

◆ TransactionIdInArray()

static bool TransactionIdInArray ( TransactionId  xid,
TransactionId xip,
Size  num 
)
static

Definition at line 5174 of file reorderbuffer.c.

5175 {
5176  return bsearch(&xid, xip, num,
5177  sizeof(TransactionId), xidComparator) != NULL;
5178 }

References xidComparator().

Referenced by UpdateLogicalMappings().

◆ UpdateLogicalMappings()

static void UpdateLogicalMappings ( HTAB tuplecid_data,
Oid  relid,
Snapshot  snapshot 
)
static

Definition at line 5197 of file reorderbuffer.c.

5198 {
5199  DIR *mapping_dir;
5200  struct dirent *mapping_de;
5201  List *files = NIL;
5202  ListCell *file;
5203  Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
5204 
5205  mapping_dir = AllocateDir(PG_LOGICAL_MAPPINGS_DIR);
5206  while ((mapping_de = ReadDir(mapping_dir, PG_LOGICAL_MAPPINGS_DIR)) != NULL)
5207  {
5208  Oid f_dboid;
5209  Oid f_relid;
5210  TransactionId f_mapped_xid;
5211  TransactionId f_create_xid;
5212  XLogRecPtr f_lsn;
5213  uint32 f_hi,
5214  f_lo;
5215  RewriteMappingFile *f;
5216 
5217  if (strcmp(mapping_de->d_name, ".") == 0 ||
5218  strcmp(mapping_de->d_name, "..") == 0)
5219  continue;
5220 
5221  /* Ignore files that aren't ours */
5222  if (strncmp(mapping_de->d_name, "map-", 4) != 0)
5223  continue;
5224 
5225  if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
5226  &f_dboid, &f_relid, &f_hi, &f_lo,
5227  &f_mapped_xid, &f_create_xid) != 6)
5228  elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
5229 
5230  f_lsn = ((uint64) f_hi) << 32 | f_lo;
5231 
5232  /* mapping for another database */
5233  if (f_dboid != dboid)
5234  continue;
5235 
5236  /* mapping for another relation */
5237  if (f_relid != relid)
5238  continue;
5239 
5240  /* did the creating transaction abort? */
5241  if (!TransactionIdDidCommit(f_create_xid))
5242  continue;
5243 
5244  /* not for our transaction */
5245  if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
5246  continue;
5247 
5248  /* ok, relevant, queue for apply */
5249  f = palloc(sizeof(RewriteMappingFile));
5250  f->lsn = f_lsn;
5251  strcpy(f->fname, mapping_de->d_name);
5252  files = lappend(files, f);
5253  }
5254  FreeDir(mapping_dir);
5255 
5256  /* sort files so we apply them in LSN order */
5257  list_sort(files, file_sort_by_lsn);
5258 
5259  foreach(file, files)
5260  {
5262 
5263  elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
5264  snapshot->subxip[0]);
5266  pfree(f);
5267  }
5268 }
bool IsSharedRelation(Oid relationId)
Definition: catalog.c:273
#define DEBUG1
Definition: elog.h:30
Oid MyDatabaseId
Definition: globals.c:93
void list_sort(List *list, list_sort_comparator cmp)
Definition: list.c:1674
List * lappend(List *list, void *datum)
Definition: list.c:339
#define NIL
Definition: pg_list.h:68
static int file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
static void ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
static bool TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
Definition: pg_list.h:54
char fname[MAXPGPATH]

References AllocateDir(), ApplyLogicalMappingFile(), dirent::d_name, DEBUG1, elog, ERROR, file_sort_by_lsn(), RewriteMappingFile::fname, FreeDir(), InvalidOid, IsSharedRelation(), lappend(), lfirst, list_sort(), LOGICAL_REWRITE_FORMAT, RewriteMappingFile::lsn, MyDatabaseId, NIL, palloc(), pfree(), PG_LOGICAL_MAPPINGS_DIR, ReadDir(), SnapshotData::subxcnt, SnapshotData::subxip, TransactionIdDidCommit(), TransactionIdInArray(), and tuplecid_data.

Referenced by ResolveCminCmaxDuringDecoding().

Variable Documentation

◆ debug_logical_replication_streaming

int debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING_BUFFERED

Definition at line 216 of file reorderbuffer.c.

Referenced by pa_send_data(), and ReorderBufferCheckMemoryLimit().

◆ logical_decoding_work_mem

int logical_decoding_work_mem

Definition at line 212 of file reorderbuffer.c.

Referenced by ReorderBufferCheckMemoryLimit().

◆ max_changes_in_memory

const Size max_changes_in_memory = 4096
static

Definition at line 213 of file reorderbuffer.c.

Referenced by ReorderBufferRestoreChanges().