PostgreSQL Source Code  git master
reorderbuffer.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/detoast.h"
#include "access/heapam.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "common/int.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenumbermap.h"
Include dependency graph for reorderbuffer.c:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTXNByIdEnt
 
struct  ReorderBufferTupleCidKey
 
struct  ReorderBufferTupleCidEnt
 
struct  TXNEntryFile
 
struct  ReorderBufferIterTXNEntry
 
struct  ReorderBufferIterTXNState
 
struct  ReorderBufferToastEnt
 
struct  ReorderBufferDiskChange
 
struct  RewriteMappingFile
 

Macros

#define IsSpecInsert(action)
 
#define IsSpecConfirmOrAbort(action)
 
#define IsInsertOrUpdate(action)
 
#define CHANGES_THRESHOLD   100
 

Typedefs

typedef struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
 
typedef struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
 
typedef struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
 
typedef struct TXNEntryFile TXNEntryFile
 
typedef struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
 
typedef struct ReorderBufferIterTXNState ReorderBufferIterTXNState
 
typedef struct ReorderBufferToastEnt ReorderBufferToastEnt
 
typedef struct ReorderBufferDiskChange ReorderBufferDiskChange
 
typedef struct RewriteMappingFile RewriteMappingFile
 

Functions

static ReorderBufferTXNReorderBufferGetTXN (ReorderBuffer *rb)
 
static void ReorderBufferReturnTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferTXNReorderBufferTXNByXid (ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
 
static void ReorderBufferTransferSnapToParent (ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
 
static void AssertTXNLsnOrder (ReorderBuffer *rb)
 
static void ReorderBufferIterTXNInit (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
 
static ReorderBufferChangeReorderBufferIterTXNNext (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferIterTXNFinish (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferExecuteInvalidations (uint32 nmsgs, SharedInvalidationMessage *msgs)
 
static void ReorderBufferCheckMemoryLimit (ReorderBuffer *rb)
 
static void ReorderBufferSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeChange (ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
 
static Size ReorderBufferRestoreChanges (ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
 
static void ReorderBufferRestoreChange (ReorderBuffer *rb, ReorderBufferTXN *txn, char *data)
 
static void ReorderBufferRestoreCleanup (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferTruncateTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
 
static void ReorderBufferCleanupSerializedTXNs (const char *slotname)
 
static void ReorderBufferSerializedPath (char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
 
static int ReorderBufferTXNSizeCompare (const pairingheap_node *a, const pairingheap_node *b, void *arg)
 
static void ReorderBufferFreeSnap (ReorderBuffer *rb, Snapshot snap)
 
static Snapshot ReorderBufferCopySnap (ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
 
static bool ReorderBufferCanStream (ReorderBuffer *rb)
 
static bool ReorderBufferCanStartStreaming (ReorderBuffer *rb)
 
static void ReorderBufferStreamTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferStreamCommit (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastInitHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReset (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReplace (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void ReorderBufferToastAppendChunk (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static Size ReorderBufferChangeSize (ReorderBufferChange *change)
 
static void ReorderBufferChangeMemoryUpdate (ReorderBuffer *rb, ReorderBufferChange *change, ReorderBufferTXN *txn, bool addition, Size sz)
 
ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *rb)
 
void ReorderBufferReturnChange (ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
 
HeapTuple ReorderBufferGetTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (HeapTuple tuple)
 
OidReorderBufferGetRelids (ReorderBuffer *rb, int nrelids)
 
void ReorderBufferReturnRelids (ReorderBuffer *rb, Oid *relids)
 
static void ReorderBufferProcessPartialChange (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
static void AssertChangeLsnOrder (ReorderBufferTXN *txn)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
TransactionId ReorderBufferGetOldestXmin (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
static int ReorderBufferIterCompare (Datum a, Datum b, void *arg)
 
static void ReorderBufferCleanupTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferBuildTupleCidHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void SetupCheckXidLive (TransactionId xid)
 
static void ReorderBufferApplyChange (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyTruncate (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyMessage (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferSaveTXNSnapshot (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
 
static void ReorderBufferResetTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
 
static void ReorderBufferProcessTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)
 
static void ReorderBufferReplay (ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
bool ReorderBufferRememberPrepareInfo (ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferSkipPrepare (ReorderBuffer *rb, TransactionId xid)
 
void ReorderBufferPrepare (ReorderBuffer *rb, TransactionId xid, char *gid)
 
void ReorderBufferFinishPrepared (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, TimestampTz abort_time)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferInvalidate (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileLocator locator, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
TransactionIdReorderBufferGetCatalogChangesXacts (ReorderBuffer *rb)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
static void ReorderBufferSerializeReserve (ReorderBuffer *rb, Size sz)
 
static ReorderBufferTXNReorderBufferLargestTXN (ReorderBuffer *rb)
 
static ReorderBufferTXNReorderBufferLargestStreamableTopTXN (ReorderBuffer *rb)
 
void StartupReorderBuffer (void)
 
static void ApplyLogicalMappingFile (HTAB *tuplecid_data, Oid relid, const char *fname)
 
static bool TransactionIdInArray (TransactionId xid, TransactionId *xip, Size num)
 
static int file_sort_by_lsn (const ListCell *a_p, const ListCell *b_p)
 
static void UpdateLogicalMappings (HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
 
bool ResolveCminCmaxDuringDecoding (HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
 

Variables

int logical_decoding_work_mem
 
static const Size max_changes_in_memory = 4096
 
int debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING_BUFFERED
 

Macro Definition Documentation

◆ CHANGES_THRESHOLD

#define CHANGES_THRESHOLD   100

◆ IsInsertOrUpdate

#define IsInsertOrUpdate (   action)
Value:

Definition at line 193 of file reorderbuffer.c.

◆ IsSpecConfirmOrAbort

#define IsSpecConfirmOrAbort (   action)
Value:
( \
)
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
Definition: reorderbuffer.h:61
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT
Definition: reorderbuffer.h:62

Definition at line 188 of file reorderbuffer.c.

◆ IsSpecInsert

#define IsSpecInsert (   action)
Value:

Definition at line 184 of file reorderbuffer.c.

Typedef Documentation

◆ ReorderBufferDiskChange

◆ ReorderBufferIterTXNEntry

◆ ReorderBufferIterTXNState

◆ ReorderBufferToastEnt

◆ ReorderBufferTupleCidEnt

◆ ReorderBufferTupleCidKey

◆ ReorderBufferTXNByIdEnt

◆ RewriteMappingFile

◆ TXNEntryFile

typedef struct TXNEntryFile TXNEntryFile

Function Documentation

◆ ApplyLogicalMappingFile()

static void ApplyLogicalMappingFile ( HTAB tuplecid_data,
Oid  relid,
const char *  fname 
)
static

Definition at line 5077 of file reorderbuffer.c.

5078 {
5079  char path[MAXPGPATH];
5080  int fd;
5081  int readBytes;
5083 
5084  sprintf(path, "%s/%s", PG_LOGICAL_MAPPINGS_DIR, fname);
5085  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
5086  if (fd < 0)
5087  ereport(ERROR,
5089  errmsg("could not open file \"%s\": %m", path)));
5090 
5091  while (true)
5092  {
5095  ReorderBufferTupleCidEnt *new_ent;
5096  bool found;
5097 
5098  /* be careful about padding */
5099  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
5100 
5101  /* read all mappings till the end of the file */
5102  pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ);
5103  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
5105 
5106  if (readBytes < 0)
5107  ereport(ERROR,
5109  errmsg("could not read file \"%s\": %m",
5110  path)));
5111  else if (readBytes == 0) /* EOF */
5112  break;
5113  else if (readBytes != sizeof(LogicalRewriteMappingData))
5114  ereport(ERROR,
5116  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
5117  path, readBytes,
5118  (int32) sizeof(LogicalRewriteMappingData))));
5119 
5120  key.rlocator = map.old_locator;
5121  ItemPointerCopy(&map.old_tid,
5122  &key.tid);
5123 
5124 
5125  ent = (ReorderBufferTupleCidEnt *)
5127 
5128  /* no existing mapping, no need to update */
5129  if (!ent)
5130  continue;
5131 
5132  key.rlocator = map.new_locator;
5133  ItemPointerCopy(&map.new_tid,
5134  &key.tid);
5135 
5136  new_ent = (ReorderBufferTupleCidEnt *)
5138 
5139  if (found)
5140  {
5141  /*
5142  * Make sure the existing mapping makes sense. We sometime update
5143  * old records that did not yet have a cmax (e.g. pg_class' own
5144  * entry while rewriting it) during rewrites, so allow that.
5145  */
5146  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
5147  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
5148  }
5149  else
5150  {
5151  /* update mapping */
5152  new_ent->cmin = ent->cmin;
5153  new_ent->cmax = ent->cmax;
5154  new_ent->combocid = ent->combocid;
5155  }
5156  }
5157 
5158  if (CloseTransientFile(fd) != 0)
5159  ereport(ERROR,
5161  errmsg("could not close file \"%s\": %m", path)));
5162 }
#define InvalidCommandId
Definition: c.h:672
signed int int32
Definition: c.h:497
#define Assert(condition)
Definition: c.h:861
#define PG_BINARY
Definition: c.h:1276
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
int errcode_for_file_access(void)
Definition: elog.c:876
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
int CloseTransientFile(int fd)
Definition: fd.c:2832
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2656
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_ENTER
Definition: hsearch.h:114
#define read(a, b, c)
Definition: win32.h:13
static void ItemPointerCopy(const ItemPointerData *fromPointer, ItemPointerData *toPointer)
Definition: itemptr.h:172
#define MAXPGPATH
#define sprintf
Definition: port.h:240
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_LOGICAL_MAPPINGS_DIR
Definition: reorderbuffer.h:23
static HTAB * tuplecid_data
Definition: snapmgr.c:102
ItemPointerData new_tid
Definition: rewriteheap.h:40
RelFileLocator old_locator
Definition: rewriteheap.h:37
ItemPointerData old_tid
Definition: rewriteheap.h:39
RelFileLocator new_locator
Definition: rewriteheap.h:38
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:85
static void pgstat_report_wait_end(void)
Definition: wait_event.h:101

References Assert, CloseTransientFile(), ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), HASH_ENTER, HASH_FIND, hash_search(), InvalidCommandId, ItemPointerCopy(), sort-test::key, MAXPGPATH, LogicalRewriteMappingData::new_locator, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_locator, LogicalRewriteMappingData::old_tid, OpenTransientFile(), PG_BINARY, PG_LOGICAL_MAPPINGS_DIR, pgstat_report_wait_end(), pgstat_report_wait_start(), read, sprintf, and tuplecid_data.

Referenced by UpdateLogicalMappings().

◆ AssertChangeLsnOrder()

static void AssertChangeLsnOrder ( ReorderBufferTXN txn)
static

Definition at line 986 of file reorderbuffer.c.

987 {
988 #ifdef USE_ASSERT_CHECKING
989  dlist_iter iter;
990  XLogRecPtr prev_lsn = txn->first_lsn;
991 
992  dlist_foreach(iter, &txn->changes)
993  {
994  ReorderBufferChange *cur_change;
995 
996  cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
997 
999  Assert(cur_change->lsn != InvalidXLogRecPtr);
1000  Assert(txn->first_lsn <= cur_change->lsn);
1001 
1002  if (txn->end_lsn != InvalidXLogRecPtr)
1003  Assert(cur_change->lsn <= txn->end_lsn);
1004 
1005  Assert(prev_lsn <= cur_change->lsn);
1006 
1007  prev_lsn = cur_change->lsn;
1008  }
1009 #endif
1010 }
#define dlist_foreach(iter, lhead)
Definition: ilist.h:623
#define dlist_container(type, membername, ptr)
Definition: ilist.h:593
XLogRecPtr first_lsn
XLogRecPtr end_lsn
dlist_head changes
dlist_node * cur
Definition: ilist.h:179
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert, ReorderBufferTXN::changes, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, and ReorderBufferChange::lsn.

Referenced by ReorderBufferIterTXNInit().

◆ AssertTXNLsnOrder()

static void AssertTXNLsnOrder ( ReorderBuffer rb)
static

Definition at line 915 of file reorderbuffer.c.

916 {
917 #ifdef USE_ASSERT_CHECKING
919  dlist_iter iter;
920  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
921  XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
922 
923  /*
924  * Skip the verification if we don't reach the LSN at which we start
925  * decoding the contents of transactions yet because until we reach the
926  * LSN, we could have transactions that don't have the association between
927  * the top-level transaction and subtransaction yet and consequently have
928  * the same LSN. We don't guarantee this association until we try to
929  * decode the actual contents of transaction. The ordering of the records
930  * prior to the start_decoding_at LSN should have been checked before the
931  * restart.
932  */
934  return;
935 
936  dlist_foreach(iter, &rb->toplevel_by_lsn)
937  {
939  iter.cur);
940 
941  /* start LSN must be set */
942  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
943 
944  /* If there is an end LSN, it must be higher than start LSN */
945  if (cur_txn->end_lsn != InvalidXLogRecPtr)
946  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
947 
948  /* Current initial LSN must be strictly higher than previous */
949  if (prev_first_lsn != InvalidXLogRecPtr)
950  Assert(prev_first_lsn < cur_txn->first_lsn);
951 
952  /* known-as-subtxn txns must not be listed */
953  Assert(!rbtxn_is_known_subxact(cur_txn));
954 
955  prev_first_lsn = cur_txn->first_lsn;
956  }
957 
959  {
961  base_snapshot_node,
962  iter.cur);
963 
964  /* base snapshot (and its LSN) must be set */
965  Assert(cur_txn->base_snapshot != NULL);
967 
968  /* current LSN must be strictly higher than previous */
969  if (prev_base_snap_lsn != InvalidXLogRecPtr)
970  Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
971 
972  /* known-as-subtxn txns must not be listed */
973  Assert(!rbtxn_is_known_subxact(cur_txn));
974 
975  prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
976  }
977 #endif
978 }
#define rbtxn_is_known_subxact(txn)
bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
Definition: snapbuild.c:443
XLogReaderState * reader
Definition: logical.h:42
struct SnapBuild * snapshot_builder
Definition: logical.h:44
XLogRecPtr base_snapshot_lsn
Snapshot base_snapshot
dlist_head txns_by_base_snapshot_lsn
dlist_head toplevel_by_lsn
void * private_data
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, XLogReaderState::EndRecPtr, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, ReorderBuffer::private_data, rbtxn_is_known_subxact, LogicalDecodingContext::reader, SnapBuildXactNeedsSkip(), LogicalDecodingContext::snapshot_builder, ReorderBuffer::toplevel_by_lsn, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferAssignChild(), ReorderBufferGetOldestTXN(), ReorderBufferGetOldestXmin(), ReorderBufferSetBaseSnapshot(), and ReorderBufferTXNByXid().

◆ file_sort_by_lsn()

static int file_sort_by_lsn ( const ListCell a_p,
const ListCell b_p 
)
static

Definition at line 5179 of file reorderbuffer.c.

5180 {
5183 
5184  return pg_cmp_u64(a->lsn, b->lsn);
5185 }
static int pg_cmp_u64(uint64 a, uint64 b)
Definition: int.h:616
int b
Definition: isn.c:70
int a
Definition: isn.c:69
#define lfirst(lc)
Definition: pg_list.h:172

References a, b, lfirst, and pg_cmp_u64().

Referenced by UpdateLogicalMappings().

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
TimestampTz  abort_time 
)

Definition at line 2926 of file reorderbuffer.c.

2928 {
2929  ReorderBufferTXN *txn;
2930 
2931  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2932  false);
2933 
2934  /* unknown, nothing to remove */
2935  if (txn == NULL)
2936  return;
2937 
2938  txn->xact_time.abort_time = abort_time;
2939 
2940  /* For streamed transactions notify the remote node about the abort. */
2941  if (rbtxn_is_streamed(txn))
2942  {
2943  rb->stream_abort(rb, txn, lsn);
2944 
2945  /*
2946  * We might have decoded changes for this transaction that could load
2947  * the cache as per the current transaction's view (consider DDL's
2948  * happened in this transaction). We don't want the decoding of future
2949  * transactions to use those cache entries so execute invalidations.
2950  */
2951  if (txn->ninvalidations > 0)
2953  txn->invalidations);
2954  }
2955 
2956  /* cosmetic... */
2957  txn->final_lsn = lsn;
2958 
2959  /* remove potential on-disk data, and deallocate */
2960  ReorderBufferCleanupTXN(rb, txn);
2961 }
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_streamed(txn)
union ReorderBufferTXN::@113 xact_time
SharedInvalidationMessage * invalidations
TimestampTz abort_time
XLogRecPtr final_lsn
ReorderBufferStreamAbortCB stream_abort

References ReorderBufferTXN::abort_time, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), ReorderBuffer::stream_abort, and ReorderBufferTXN::xact_time.

Referenced by DecodeAbort().

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)

Definition at line 2971 of file reorderbuffer.c.

2972 {
2973  dlist_mutable_iter it;
2974 
2975  /*
2976  * Iterate through all (potential) toplevel TXNs and abort all that are
2977  * older than what possibly can be running. Once we've found the first
2978  * that is alive we stop, there might be some that acquired an xid earlier
2979  * but started writing later, but it's unlikely and they will be cleaned
2980  * up in a later call to this function.
2981  */
2983  {
2984  ReorderBufferTXN *txn;
2985 
2986  txn = dlist_container(ReorderBufferTXN, node, it.cur);
2987 
2988  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2989  {
2990  elog(DEBUG2, "aborting old transaction %u", txn->xid);
2991 
2992  /* Notify the remote node about the crash/immediate restart. */
2993  if (rbtxn_is_streamed(txn))
2994  rb->stream_abort(rb, txn, InvalidXLogRecPtr);
2995 
2996  /* remove potential on-disk data, and deallocate this tx */
2997  ReorderBufferCleanupTXN(rb, txn);
2998  }
2999  else
3000  return;
3001  }
3002 }
#define DEBUG2
Definition: elog.h:29
#define elog(elevel,...)
Definition: elog.h:225
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:640
TransactionId xid
dlist_node * cur
Definition: ilist.h:200
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280

References dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, elog, InvalidXLogRecPtr, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBuffer::stream_abort, ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), and ReorderBufferTXN::xid.

Referenced by standby_decode().

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 3324 of file reorderbuffer.c.

3327 {
3328  ReorderBufferTXN *txn;
3329  MemoryContext oldcontext;
3330  ReorderBufferChange *change;
3331 
3332  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3333 
3334  oldcontext = MemoryContextSwitchTo(rb->context);
3335 
3336  /*
3337  * Collect all the invalidations under the top transaction, if available,
3338  * so that we can execute them all together. See comments atop this
3339  * function.
3340  */
3341  txn = rbtxn_get_toptxn(txn);
3342 
3343  Assert(nmsgs > 0);
3344 
3345  /* Accumulate invalidations. */
3346  if (txn->ninvalidations == 0)
3347  {
3348  txn->ninvalidations = nmsgs;
3350  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3351  memcpy(txn->invalidations, msgs,
3352  sizeof(SharedInvalidationMessage) * nmsgs);
3353  }
3354  else
3355  {
3358  (txn->ninvalidations + nmsgs));
3359 
3360  memcpy(txn->invalidations + txn->ninvalidations, msgs,
3361  nmsgs * sizeof(SharedInvalidationMessage));
3362  txn->ninvalidations += nmsgs;
3363  }
3364 
3365  change = ReorderBufferGetChange(rb);
3367  change->data.inval.ninvalidations = nmsgs;
3368  change->data.inval.invalidations = (SharedInvalidationMessage *)
3369  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3370  memcpy(change->data.inval.invalidations, msgs,
3371  sizeof(SharedInvalidationMessage) * nmsgs);
3372 
3373  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3374 
3375  MemoryContextSwitchTo(oldcontext);
3376 }
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1541
void * palloc(Size size)
Definition: mcxt.c:1317
MemoryContextSwitchTo(old_ctx)
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
#define rbtxn_get_toptxn(txn)
@ REORDER_BUFFER_CHANGE_INVALIDATION
Definition: reorderbuffer.h:56
ReorderBufferChangeType action
Definition: reorderbuffer.h:81
struct ReorderBufferChange::@107::@112 inval
union ReorderBufferChange::@107 data
MemoryContext context

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferTXN::invalidations, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, palloc(), rbtxn_get_toptxn, REORDER_BUFFER_CHANGE_INVALIDATION, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), and repalloc().

Referenced by xact_decode().

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileLocator  locator,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 3287 of file reorderbuffer.c.

3291 {
3293  ReorderBufferTXN *txn;
3294 
3295  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3296 
3297  change->data.tuplecid.locator = locator;
3298  change->data.tuplecid.tid = tid;
3299  change->data.tuplecid.cmin = cmin;
3300  change->data.tuplecid.cmax = cmax;
3301  change->data.tuplecid.combocid = combocid;
3302  change->lsn = lsn;
3303  change->txn = txn;
3305 
3306  dlist_push_tail(&txn->tuplecids, &change->node);
3307  txn->ntuplecids++;
3308 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:364
@ REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
Definition: reorderbuffer.h:59
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:84
struct ReorderBufferChange::@107::@111 tuplecid
dlist_head tuplecids

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, and ReorderBufferChange::txn.

Referenced by SnapBuildProcessNewCid().

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

◆ ReorderBufferAllocate()

ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 309 of file reorderbuffer.c.

310 {
311  ReorderBuffer *buffer;
312  HASHCTL hash_ctl;
313  MemoryContext new_ctx;
314 
315  Assert(MyReplicationSlot != NULL);
316 
317  /* allocate memory in own context, to have better accountability */
319  "ReorderBuffer",
321 
322  buffer =
323  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
324 
325  memset(&hash_ctl, 0, sizeof(hash_ctl));
326 
327  buffer->context = new_ctx;
328 
329  buffer->change_context = SlabContextCreate(new_ctx,
330  "Change",
332  sizeof(ReorderBufferChange));
333 
334  buffer->txn_context = SlabContextCreate(new_ctx,
335  "TXN",
337  sizeof(ReorderBufferTXN));
338 
339  /*
340  * XXX the allocation sizes used below pre-date generation context's block
341  * growing code. These values should likely be benchmarked and set to
342  * more suitable values.
343  */
344  buffer->tup_context = GenerationContextCreate(new_ctx,
345  "Tuples",
349 
350  hash_ctl.keysize = sizeof(TransactionId);
351  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
352  hash_ctl.hcxt = buffer->context;
353 
354  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
356 
358  buffer->by_txn_last_txn = NULL;
359 
360  buffer->outbuf = NULL;
361  buffer->outbufsize = 0;
362  buffer->size = 0;
363 
364  /* txn_heap is ordered by transaction size */
366 
367  buffer->spillTxns = 0;
368  buffer->spillCount = 0;
369  buffer->spillBytes = 0;
370  buffer->streamTxns = 0;
371  buffer->streamCount = 0;
372  buffer->streamBytes = 0;
373  buffer->totalTxns = 0;
374  buffer->totalBytes = 0;
375 
377 
378  dlist_init(&buffer->toplevel_by_lsn);
380  dclist_init(&buffer->catchange_txns);
381 
382  /*
383  * Ensure there's no stale data from prior uses of this slot, in case some
384  * prior exit avoided calling ReorderBufferFree. Failure to do this can
385  * produce duplicated txns, and it's very cheap if there's nothing there.
386  */
388 
389  return buffer;
390 }
#define NameStr(name)
Definition: c.h:749
uint32 TransactionId
Definition: c.h:655
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: generation.c:160
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
static void dclist_init(dclist_head *head)
Definition: ilist.h:671
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1181
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:189
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:190
pairingheap * pairingheap_allocate(pairingheap_comparator compare, void *arg)
Definition: pairingheap.c:42
static int ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg)
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:322
ReplicationSlot * MyReplicationSlot
Definition: slot.c:138
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
MemoryContext hcxt
Definition: hsearch.h:86
dclist_head catchange_txns
MemoryContext change_context
ReorderBufferTXN * by_txn_last_txn
TransactionId by_txn_last_xid
MemoryContext tup_context
pairingheap * txn_heap
MemoryContext txn_context
XLogRecPtr current_restart_decoding_lsn
ReplicationSlotPersistentData data
Definition: slot.h:181
#define InvalidTransactionId
Definition: transam.h:31

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::catchange_txns, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dclist_init(), dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, pairingheap_allocate(), ReorderBufferCleanupSerializedTXNs(), ReorderBufferTXNSizeCompare(), ReorderBuffer::size, SLAB_DEFAULT_BLOCK_SIZE, SLAB_LARGE_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBuffer::toplevel_by_lsn, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBuffer::tup_context, ReorderBuffer::txn_context, ReorderBuffer::txn_heap, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

◆ ReorderBufferApplyChange()

static void ReorderBufferApplyChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1989 of file reorderbuffer.c.

1992 {
1993  if (streaming)
1994  rb->stream_change(rb, txn, relation, change);
1995  else
1996  rb->apply_change(rb, txn, relation, change);
1997 }
ReorderBufferStreamChangeCB stream_change
ReorderBufferApplyChangeCB apply_change

References ReorderBuffer::apply_change, and ReorderBuffer::stream_change.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferApplyMessage()

static void ReorderBufferApplyMessage ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 2017 of file reorderbuffer.c.

2019 {
2020  if (streaming)
2021  rb->stream_message(rb, txn, change->lsn, true,
2022  change->data.msg.prefix,
2023  change->data.msg.message_size,
2024  change->data.msg.message);
2025  else
2026  rb->message(rb, txn, change->lsn, true,
2027  change->data.msg.prefix,
2028  change->data.msg.message_size,
2029  change->data.msg.message);
2030 }
struct ReorderBufferChange::@107::@110 msg
ReorderBufferStreamMessageCB stream_message
ReorderBufferMessageCB message

References ReorderBufferChange::data, ReorderBufferChange::lsn, ReorderBuffer::message, ReorderBufferChange::msg, and ReorderBuffer::stream_message.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferApplyTruncate()

static void ReorderBufferApplyTruncate ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  nrelations,
Relation relations,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 2003 of file reorderbuffer.c.

2006 {
2007  if (streaming)
2008  rb->stream_truncate(rb, txn, nrelations, relations, change);
2009  else
2010  rb->apply_truncate(rb, txn, nrelations, relations, change);
2011 }
ReorderBufferStreamTruncateCB stream_truncate
ReorderBufferApplyTruncateCB apply_truncate

References ReorderBuffer::apply_truncate, and ReorderBuffer::stream_truncate.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 1072 of file reorderbuffer.c.

1074 {
1075  ReorderBufferTXN *txn;
1076  ReorderBufferTXN *subtxn;
1077  bool new_top;
1078  bool new_sub;
1079 
1080  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1081  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1082 
1083  if (!new_sub)
1084  {
1085  if (rbtxn_is_known_subxact(subtxn))
1086  {
1087  /* already associated, nothing to do */
1088  return;
1089  }
1090  else
1091  {
1092  /*
1093  * We already saw this transaction, but initially added it to the
1094  * list of top-level txns. Now that we know it's not top-level,
1095  * remove it from there.
1096  */
1097  dlist_delete(&subtxn->node);
1098  }
1099  }
1100 
1101  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1102  subtxn->toplevel_xid = xid;
1103  Assert(subtxn->nsubtxns == 0);
1104 
1105  /* set the reference to top-level transaction */
1106  subtxn->toptxn = txn;
1107 
1108  /* add to subtransaction list */
1109  dlist_push_tail(&txn->subtxns, &subtxn->node);
1110  txn->nsubtxns++;
1111 
1112  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1113  ReorderBufferTransferSnapToParent(txn, subtxn);
1114 
1115  /* Verify LSN-ordering invariant */
1116  AssertTXNLsnOrder(rb);
1117 }
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
#define RBTXN_IS_SUBXACT
TransactionId toplevel_xid
struct ReorderBufferTXN * toptxn
dlist_head subtxns

References Assert, AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SUBXACT, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, ReorderBufferTXN::toplevel_xid, ReorderBufferTXN::toptxn, and ReorderBufferTXN::txn_flags.

Referenced by LogicalDecodingProcessRecord(), and ReorderBufferCommitChild().

◆ ReorderBufferBuildTupleCidHash()

static void ReorderBufferBuildTupleCidHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1755 of file reorderbuffer.c.

1756 {
1757  dlist_iter iter;
1758  HASHCTL hash_ctl;
1759 
1761  return;
1762 
1763  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1764  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1765  hash_ctl.hcxt = rb->context;
1766 
1767  /*
1768  * create the hash with the exact number of to-be-stored tuplecids from
1769  * the start
1770  */
1771  txn->tuplecid_hash =
1772  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1774 
1775  dlist_foreach(iter, &txn->tuplecids)
1776  {
1779  bool found;
1780  ReorderBufferChange *change;
1781 
1782  change = dlist_container(ReorderBufferChange, node, iter.cur);
1783 
1785 
1786  /* be careful about padding */
1787  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1788 
1789  key.rlocator = change->data.tuplecid.locator;
1790 
1791  ItemPointerCopy(&change->data.tuplecid.tid,
1792  &key.tid);
1793 
1794  ent = (ReorderBufferTupleCidEnt *)
1795  hash_search(txn->tuplecid_hash, &key, HASH_ENTER, &found);
1796  if (!found)
1797  {
1798  ent->cmin = change->data.tuplecid.cmin;
1799  ent->cmax = change->data.tuplecid.cmax;
1800  ent->combocid = change->data.tuplecid.combocid;
1801  }
1802  else
1803  {
1804  /*
1805  * Maybe we already saw this tuple before in this transaction, but
1806  * if so it must have the same cmin.
1807  */
1808  Assert(ent->cmin == change->data.tuplecid.cmin);
1809 
1810  /*
1811  * cmax may be initially invalid, but once set it can only grow,
1812  * and never become invalid again.
1813  */
1814  Assert((ent->cmax == InvalidCommandId) ||
1815  ((change->data.tuplecid.cmax != InvalidCommandId) &&
1816  (change->data.tuplecid.cmax > ent->cmax)));
1817  ent->cmax = change->data.tuplecid.cmax;
1818  }
1819  }
1820 }
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
#define rbtxn_has_catalog_changes(txn)

References ReorderBufferChange::action, Assert, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, dlist_container, dlist_foreach, dlist_is_empty(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), HASHCTL::hcxt, InvalidCommandId, ItemPointerCopy(), sort-test::key, HASHCTL::keysize, ReorderBufferTXN::ntuplecids, rbtxn_has_catalog_changes, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::tuplecids.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferCanStartStreaming()

static bool ReorderBufferCanStartStreaming ( ReorderBuffer rb)
inlinestatic

Definition at line 4024 of file reorderbuffer.c.

4025 {
4027  SnapBuild *builder = ctx->snapshot_builder;
4028 
4029  /* We can't start streaming unless a consistent state is reached. */
4031  return false;
4032 
4033  /*
4034  * We can't start streaming immediately even if the streaming is enabled
4035  * because we previously decoded this transaction and now just are
4036  * restarting.
4037  */
4038  if (ReorderBufferCanStream(rb) &&
4039  !SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr))
4040  return true;
4041 
4042  return false;
4043 }
static bool ReorderBufferCanStream(ReorderBuffer *rb)
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
Definition: snapbuild.c:416
@ SNAPBUILD_CONSISTENT
Definition: snapbuild.h:46
XLogRecPtr ReadRecPtr
Definition: xlogreader.h:206

References ReorderBuffer::private_data, LogicalDecodingContext::reader, XLogReaderState::ReadRecPtr, ReorderBufferCanStream(), SNAPBUILD_CONSISTENT, SnapBuildCurrentState(), SnapBuildXactNeedsSkip(), and LogicalDecodingContext::snapshot_builder.

Referenced by ReorderBufferCheckMemoryLimit(), and ReorderBufferProcessPartialChange().

◆ ReorderBufferCanStream()

static bool ReorderBufferCanStream ( ReorderBuffer rb)
inlinestatic

Definition at line 4015 of file reorderbuffer.c.

4016 {
4018 
4019  return ctx->streaming;
4020 }

References ReorderBuffer::private_data, and LogicalDecodingContext::streaming.

Referenced by ReorderBufferCanStartStreaming(), and ReorderBufferProcessPartialChange().

◆ ReorderBufferChangeMemoryUpdate()

static void ReorderBufferChangeMemoryUpdate ( ReorderBuffer rb,
ReorderBufferChange change,
ReorderBufferTXN txn,
bool  addition,
Size  sz 
)
static

Definition at line 3216 of file reorderbuffer.c.

3220 {
3221  ReorderBufferTXN *toptxn;
3222 
3223  Assert(txn || change);
3224 
3225  /*
3226  * Ignore tuple CID changes, because those are not evicted when reaching
3227  * memory limit. So we just don't count them, because it might easily
3228  * trigger a pointless attempt to spill.
3229  */
3230  if (change && change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
3231  return;
3232 
3233  if (sz == 0)
3234  return;
3235 
3236  if (txn == NULL)
3237  txn = change->txn;
3238  Assert(txn != NULL);
3239 
3240  /*
3241  * Update the total size in top level as well. This is later used to
3242  * compute the decoding stats.
3243  */
3244  toptxn = rbtxn_get_toptxn(txn);
3245 
3246  if (addition)
3247  {
3248  Size oldsize = txn->size;
3249 
3250  txn->size += sz;
3251  rb->size += sz;
3252 
3253  /* Update the total size in the top transaction. */
3254  toptxn->total_size += sz;
3255 
3256  /* Update the max-heap */
3257  if (oldsize != 0)
3258  pairingheap_remove(rb->txn_heap, &txn->txn_node);
3259  pairingheap_add(rb->txn_heap, &txn->txn_node);
3260  }
3261  else
3262  {
3263  Assert((rb->size >= sz) && (txn->size >= sz));
3264  txn->size -= sz;
3265  rb->size -= sz;
3266 
3267  /* Update the total size in the top transaction. */
3268  toptxn->total_size -= sz;
3269 
3270  /* Update the max-heap */
3271  pairingheap_remove(rb->txn_heap, &txn->txn_node);
3272  if (txn->size != 0)
3273  pairingheap_add(rb->txn_heap, &txn->txn_node);
3274  }
3275 
3276  Assert(txn->size <= rb->size);
3277 }
size_t Size
Definition: c.h:608
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:184
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:126
pairingheap_node txn_node

References ReorderBufferChange::action, Assert, pairingheap_add(), pairingheap_remove(), rbtxn_get_toptxn, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferTXN::size, ReorderBuffer::size, ReorderBufferTXN::total_size, ReorderBufferChange::txn, ReorderBuffer::txn_heap, and ReorderBufferTXN::txn_node.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferSerializeTXN(), ReorderBufferToastReplace(), and ReorderBufferTruncateTXN().

◆ ReorderBufferChangeSize()

static Size ReorderBufferChangeSize ( ReorderBufferChange change)
static

Definition at line 4167 of file reorderbuffer.c.

4168 {
4169  Size sz = sizeof(ReorderBufferChange);
4170 
4171  switch (change->action)
4172  {
4173  /* fall through these, they're all similar enough */
4178  {
4179  HeapTuple oldtup,
4180  newtup;
4181  Size oldlen = 0;
4182  Size newlen = 0;
4183 
4184  oldtup = change->data.tp.oldtuple;
4185  newtup = change->data.tp.newtuple;
4186 
4187  if (oldtup)
4188  {
4189  sz += sizeof(HeapTupleData);
4190  oldlen = oldtup->t_len;
4191  sz += oldlen;
4192  }
4193 
4194  if (newtup)
4195  {
4196  sz += sizeof(HeapTupleData);
4197  newlen = newtup->t_len;
4198  sz += newlen;
4199  }
4200 
4201  break;
4202  }
4204  {
4205  Size prefix_size = strlen(change->data.msg.prefix) + 1;
4206 
4207  sz += prefix_size + change->data.msg.message_size +
4208  sizeof(Size) + sizeof(Size);
4209 
4210  break;
4211  }
4213  {
4214  sz += sizeof(SharedInvalidationMessage) *
4215  change->data.inval.ninvalidations;
4216  break;
4217  }
4219  {
4220  Snapshot snap;
4221 
4222  snap = change->data.snapshot;
4223 
4224  sz += sizeof(SnapshotData) +
4225  sizeof(TransactionId) * snap->xcnt +
4226  sizeof(TransactionId) * snap->subxcnt;
4227 
4228  break;
4229  }
4231  {
4232  sz += sizeof(Oid) * change->data.truncate.nrelids;
4233 
4234  break;
4235  }
4240  /* ReorderBufferChange contains everything important */
4241  break;
4242  }
4243 
4244  return sz;
4245 }
struct HeapTupleData HeapTupleData
unsigned int Oid
Definition: postgres_ext.h:31
struct ReorderBufferChange ReorderBufferChange
@ REORDER_BUFFER_CHANGE_MESSAGE
Definition: reorderbuffer.h:55
@ REORDER_BUFFER_CHANGE_TRUNCATE
Definition: reorderbuffer.h:63
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:54
struct SnapshotData SnapshotData
uint32 t_len
Definition: htup.h:64
struct ReorderBufferChange::@107::@109 truncate
struct ReorderBufferChange::@107::@108 tp
int32 subxcnt
Definition: snapshot.h:181
uint32 xcnt
Definition: snapshot.h:169

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::msg, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::snapshot, SnapshotData::subxcnt, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, and SnapshotData::xcnt.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferToastReplace(), and ReorderBufferTruncateTXN().

◆ ReorderBufferCheckMemoryLimit()

static void ReorderBufferCheckMemoryLimit ( ReorderBuffer rb)
static

Definition at line 3632 of file reorderbuffer.c.

3633 {
3634  ReorderBufferTXN *txn;
3635 
3636  /*
3637  * Bail out if debug_logical_replication_streaming is buffered and we
3638  * haven't exceeded the memory limit.
3639  */
3641  rb->size < logical_decoding_work_mem * 1024L)
3642  return;
3643 
3644  /*
3645  * If debug_logical_replication_streaming is immediate, loop until there's
3646  * no change. Otherwise, loop until we reach under the memory limit. One
3647  * might think that just by evicting the largest (sub)transaction we will
3648  * come under the memory limit based on assumption that the selected
3649  * transaction is at least as large as the most recent change (which
3650  * caused us to go over the memory limit). However, that is not true
3651  * because a user can reduce the logical_decoding_work_mem to a smaller
3652  * value before the most recent change.
3653  */
3654  while (rb->size >= logical_decoding_work_mem * 1024L ||
3656  rb->size > 0))
3657  {
3658  /*
3659  * Pick the largest transaction and evict it from memory by streaming,
3660  * if possible. Otherwise, spill to disk.
3661  */
3663  (txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
3664  {
3665  /* we know there has to be one, because the size is not zero */
3666  Assert(txn && rbtxn_is_toptxn(txn));
3667  Assert(txn->total_size > 0);
3668  Assert(rb->size >= txn->total_size);
3669 
3670  ReorderBufferStreamTXN(rb, txn);
3671  }
3672  else
3673  {
3674  /*
3675  * Pick the largest transaction (or subtransaction) and evict it
3676  * from memory by serializing it to disk.
3677  */
3678  txn = ReorderBufferLargestTXN(rb);
3679 
3680  /* we know there has to be one, because the size is not zero */
3681  Assert(txn);
3682  Assert(txn->size > 0);
3683  Assert(rb->size >= txn->size);
3684 
3685  ReorderBufferSerializeTXN(rb, txn);
3686  }
3687 
3688  /*
3689  * After eviction, the transaction should have no entries in memory,
3690  * and should use 0 bytes for changes.
3691  */
3692  Assert(txn->size == 0);
3693  Assert(txn->nentries_mem == 0);
3694  }
3695 
3696  /* We must be under the memory limit now. */
3697  Assert(rb->size < logical_decoding_work_mem * 1024L);
3698 
3699 }
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)
static ReorderBufferTXN * ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
int logical_decoding_work_mem
int debug_logical_replication_streaming
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
Definition: reorderbuffer.h:34
@ DEBUG_LOGICAL_REP_STREAMING_BUFFERED
Definition: reorderbuffer.h:33
#define rbtxn_is_toptxn(txn)

References Assert, DEBUG_LOGICAL_REP_STREAMING_BUFFERED, DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE, debug_logical_replication_streaming, logical_decoding_work_mem, ReorderBufferTXN::nentries_mem, rbtxn_is_toptxn, ReorderBufferCanStartStreaming(), ReorderBufferLargestStreamableTopTXN(), ReorderBufferLargestTXN(), ReorderBufferSerializeTXN(), ReorderBufferStreamTXN(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBufferTXN::total_size.

Referenced by ReorderBufferQueueChange().

◆ ReorderBufferCleanupSerializedTXNs()

static void ReorderBufferCleanupSerializedTXNs ( const char *  slotname)
static

Definition at line 4593 of file reorderbuffer.c.

4594 {
4595  DIR *spill_dir;
4596  struct dirent *spill_de;
4597  struct stat statbuf;
4598  char path[MAXPGPATH * 2 + sizeof(PG_REPLSLOT_DIR)];
4599 
4600  sprintf(path, "%s/%s", PG_REPLSLOT_DIR, slotname);
4601 
4602  /* we're only handling directories here, skip if it's not ours */
4603  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
4604  return;
4605 
4606  spill_dir = AllocateDir(path);
4607  while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
4608  {
4609  /* only look at names that can be ours */
4610  if (strncmp(spill_de->d_name, "xid", 3) == 0)
4611  {
4612  snprintf(path, sizeof(path),
4613  "%s/%s/%s", PG_REPLSLOT_DIR, slotname,
4614  spill_de->d_name);
4615 
4616  if (unlink(path) != 0)
4617  ereport(ERROR,
4619  errmsg("could not remove file \"%s\" during removal of %s/%s/xid*: %m",
4620  path, PG_REPLSLOT_DIR, slotname)));
4621  }
4622  }
4623  FreeDir(spill_dir);
4624 }
#define INFO
Definition: elog.h:34
int FreeDir(DIR *dir)
Definition: fd.c:2984
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2947
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2866
#define snprintf
Definition: port.h:238
#define PG_REPLSLOT_DIR
Definition: slot.h:21
Definition: dirent.c:26
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15
#define lstat(path, sb)
Definition: win32_port.h:285
#define S_ISDIR(m)
Definition: win32_port.h:325

References AllocateDir(), dirent::d_name, ereport, errcode_for_file_access(), errmsg(), ERROR, FreeDir(), INFO, lstat, MAXPGPATH, PG_REPLSLOT_DIR, ReadDirExtended(), S_ISDIR, snprintf, sprintf, and stat::st_mode.

Referenced by ReorderBufferAllocate(), ReorderBufferFree(), and StartupReorderBuffer().

◆ ReorderBufferCleanupTXN()

static void ReorderBufferCleanupTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1508 of file reorderbuffer.c.

1509 {
1510  bool found;
1511  dlist_mutable_iter iter;
1512  Size mem_freed = 0;
1513 
1514  /* cleanup subtransactions & their changes */
1515  dlist_foreach_modify(iter, &txn->subtxns)
1516  {
1517  ReorderBufferTXN *subtxn;
1518 
1519  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1520 
1521  /*
1522  * Subtransactions are always associated to the toplevel TXN, even if
1523  * they originally were happening inside another subtxn, so we won't
1524  * ever recurse more than one level deep here.
1525  */
1526  Assert(rbtxn_is_known_subxact(subtxn));
1527  Assert(subtxn->nsubtxns == 0);
1528 
1529  ReorderBufferCleanupTXN(rb, subtxn);
1530  }
1531 
1532  /* cleanup changes in the txn */
1533  dlist_foreach_modify(iter, &txn->changes)
1534  {
1535  ReorderBufferChange *change;
1536 
1537  change = dlist_container(ReorderBufferChange, node, iter.cur);
1538 
1539  /* Check we're not mixing changes from different transactions. */
1540  Assert(change->txn == txn);
1541 
1542  /*
1543  * Instead of updating the memory counter for individual changes, we
1544  * sum up the size of memory to free so we can update the memory
1545  * counter all together below. This saves costs of maintaining the
1546  * max-heap.
1547  */
1548  mem_freed += ReorderBufferChangeSize(change);
1549 
1550  ReorderBufferReturnChange(rb, change, false);
1551  }
1552 
1553  /* Update the memory counter */
1554  ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
1555 
1556  /*
1557  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1558  * They are always stored in the toplevel transaction.
1559  */
1560  dlist_foreach_modify(iter, &txn->tuplecids)
1561  {
1562  ReorderBufferChange *change;
1563 
1564  change = dlist_container(ReorderBufferChange, node, iter.cur);
1565 
1566  /* Check we're not mixing changes from different transactions. */
1567  Assert(change->txn == txn);
1569 
1570  ReorderBufferReturnChange(rb, change, true);
1571  }
1572 
1573  /*
1574  * Cleanup the base snapshot, if set.
1575  */
1576  if (txn->base_snapshot != NULL)
1577  {
1580  }
1581 
1582  /*
1583  * Cleanup the snapshot for the last streamed run.
1584  */
1585  if (txn->snapshot_now != NULL)
1586  {
1587  Assert(rbtxn_is_streamed(txn));
1589  }
1590 
1591  /*
1592  * Remove TXN from its containing lists.
1593  *
1594  * Note: if txn is known as subxact, we are deleting the TXN from its
1595  * parent's list of known subxacts; this leaves the parent's nsubxacts
1596  * count too high, but we don't care. Otherwise, we are deleting the TXN
1597  * from the LSN-ordered list of toplevel TXNs. We remove the TXN from the
1598  * list of catalog modifying transactions as well.
1599  */
1600  dlist_delete(&txn->node);
1601  if (rbtxn_has_catalog_changes(txn))
1603 
1604  /* now remove reference from buffer */
1605  hash_search(rb->by_txn, &txn->xid, HASH_REMOVE, &found);
1606  Assert(found);
1607 
1608  /* remove entries spilled to disk */
1609  if (rbtxn_is_serialized(txn))
1610  ReorderBufferRestoreCleanup(rb, txn);
1611 
1612  /* deallocate */
1613  ReorderBufferReturnTXN(rb, txn);
1614 }
@ HASH_REMOVE
Definition: hsearch.h:115
static void dclist_delete_from(dclist_head *head, dlist_node *node)
Definition: ilist.h:763
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, ReorderBufferTXN *txn, bool addition, Size sz)
#define rbtxn_is_serialized(txn)
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:467
Snapshot snapshot_now
dlist_node catchange_node
dlist_node base_snapshot_node

References ReorderBufferChange::action, Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_node, ReorderBuffer::by_txn, ReorderBufferTXN::catchange_node, ReorderBuffer::catchange_txns, ReorderBufferTXN::changes, dlist_mutable_iter::cur, dclist_delete_from(), dlist_container, dlist_delete(), dlist_foreach_modify, HASH_REMOVE, hash_search(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_has_catalog_changes, rbtxn_is_known_subxact, rbtxn_is_serialized, rbtxn_is_streamed, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferFreeSnap(), ReorderBufferRestoreCleanup(), ReorderBufferReturnChange(), ReorderBufferReturnTXN(), SnapBuildSnapDecRefcount(), ReorderBufferTXN::snapshot_now, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAbortOld(), ReorderBufferFinishPrepared(), ReorderBufferForget(), ReorderBufferProcessTXN(), ReorderBufferReplay(), and ReorderBufferStreamCommit().

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2735 of file reorderbuffer.c.

2739 {
2740  ReorderBufferTXN *txn;
2741 
2742  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2743  false);
2744 
2745  /* unknown transaction, nothing to replay */
2746  if (txn == NULL)
2747  return;
2748 
2749  ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2750  origin_id, origin_lsn);
2751 }
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)

References InvalidXLogRecPtr, ReorderBufferReplay(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 1192 of file reorderbuffer.c.

1195 {
1196  ReorderBufferTXN *subtxn;
1197 
1198  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1199  InvalidXLogRecPtr, false);
1200 
1201  /*
1202  * No need to do anything if that subtxn didn't contain any changes
1203  */
1204  if (!subtxn)
1205  return;
1206 
1207  subtxn->final_lsn = commit_lsn;
1208  subtxn->end_lsn = end_lsn;
1209 
1210  /*
1211  * Assign this subxact as a child of the toplevel xact (no-op if already
1212  * done.)
1213  */
1214  ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1215 }
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit(), and DecodePrepare().

◆ ReorderBufferCopySnap()

static Snapshot ReorderBufferCopySnap ( ReorderBuffer rb,
Snapshot  orig_snap,
ReorderBufferTXN txn,
CommandId  cid 
)
static

Definition at line 1828 of file reorderbuffer.c.

1830 {
1831  Snapshot snap;
1832  dlist_iter iter;
1833  int i = 0;
1834  Size size;
1835 
1836  size = sizeof(SnapshotData) +
1837  sizeof(TransactionId) * orig_snap->xcnt +
1838  sizeof(TransactionId) * (txn->nsubtxns + 1);
1839 
1840  snap = MemoryContextAllocZero(rb->context, size);
1841  memcpy(snap, orig_snap, sizeof(SnapshotData));
1842 
1843  snap->copied = true;
1844  snap->active_count = 1; /* mark as active so nobody frees it */
1845  snap->regd_count = 0;
1846  snap->xip = (TransactionId *) (snap + 1);
1847 
1848  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1849 
1850  /*
1851  * snap->subxip contains all txids that belong to our transaction which we
1852  * need to check via cmin/cmax. That's why we store the toplevel
1853  * transaction in there as well.
1854  */
1855  snap->subxip = snap->xip + snap->xcnt;
1856  snap->subxip[i++] = txn->xid;
1857 
1858  /*
1859  * subxcnt isn't decreased when subtransactions abort, so count manually.
1860  * Since it's an upper boundary it is safe to use it for the allocation
1861  * above.
1862  */
1863  snap->subxcnt = 1;
1864 
1865  dlist_foreach(iter, &txn->subtxns)
1866  {
1867  ReorderBufferTXN *sub_txn;
1868 
1869  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1870  snap->subxip[i++] = sub_txn->xid;
1871  snap->subxcnt++;
1872  }
1873 
1874  /* sort so we can bsearch() later */
1875  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1876 
1877  /* store the specified current CommandId */
1878  snap->curcid = cid;
1879 
1880  return snap;
1881 }
int i
Definition: isn.c:73
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1215
#define qsort(a, b, c, d)
Definition: port.h:447
static pg_noinline void Size size
Definition: slab.c:607
bool copied
Definition: snapshot.h:185
uint32 regd_count
Definition: snapshot.h:205
uint32 active_count
Definition: snapshot.h:204
CommandId curcid
Definition: snapshot.h:187
TransactionId * subxip
Definition: snapshot.h:180
TransactionId * xip
Definition: snapshot.h:168
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:152

References SnapshotData::active_count, ReorderBuffer::context, SnapshotData::copied, dlist_iter::cur, SnapshotData::curcid, dlist_container, dlist_foreach, i, MemoryContextAllocZero(), ReorderBufferTXN::nsubtxns, qsort, SnapshotData::regd_count, size, ReorderBufferTXN::subtxns, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::xcnt, ReorderBufferTXN::xid, xidComparator(), and SnapshotData::xip.

Referenced by ReorderBufferProcessTXN(), ReorderBufferSaveTXNSnapshot(), and ReorderBufferStreamTXN().

◆ ReorderBufferExecuteInvalidations()

static void ReorderBufferExecuteInvalidations ( uint32  nmsgs,
SharedInvalidationMessage msgs 
)
static

Definition at line 3383 of file reorderbuffer.c.

3384 {
3385  int i;
3386 
3387  for (i = 0; i < nmsgs; i++)
3389 }
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:705

References i, and LocalExecuteInvalidationMessage().

Referenced by ReorderBufferFinishPrepared(), and ReorderBufferProcessTXN().

◆ ReorderBufferFinishPrepared()

void ReorderBufferFinishPrepared ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
XLogRecPtr  two_phase_at,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn,
char *  gid,
bool  is_commit 
)

Definition at line 2841 of file reorderbuffer.c.

2846 {
2847  ReorderBufferTXN *txn;
2848  XLogRecPtr prepare_end_lsn;
2849  TimestampTz prepare_time;
2850 
2851  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
2852 
2853  /* unknown transaction, nothing to do */
2854  if (txn == NULL)
2855  return;
2856 
2857  /*
2858  * By this time the txn has the prepare record information, remember it to
2859  * be later used for rollback.
2860  */
2861  prepare_end_lsn = txn->end_lsn;
2862  prepare_time = txn->xact_time.prepare_time;
2863 
2864  /* add the gid in the txn */
2865  txn->gid = pstrdup(gid);
2866 
2867  /*
2868  * It is possible that this transaction is not decoded at prepare time
2869  * either because by that time we didn't have a consistent snapshot, or
2870  * two_phase was not enabled, or it was decoded earlier but we have
2871  * restarted. We only need to send the prepare if it was not decoded
2872  * earlier. We don't need to decode the xact for aborts if it is not done
2873  * already.
2874  */
2875  if ((txn->final_lsn < two_phase_at) && is_commit)
2876  {
2877  txn->txn_flags |= RBTXN_PREPARE;
2878 
2879  /*
2880  * The prepare info must have been updated in txn even if we skip
2881  * prepare.
2882  */
2884 
2885  /*
2886  * By this time the txn has the prepare record information and it is
2887  * important to use that so that downstream gets the accurate
2888  * information. If instead, we have passed commit information here
2889  * then downstream can behave as it has already replayed commit
2890  * prepared after the restart.
2891  */
2892  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2893  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2894  }
2895 
2896  txn->final_lsn = commit_lsn;
2897  txn->end_lsn = end_lsn;
2898  txn->xact_time.commit_time = commit_time;
2899  txn->origin_id = origin_id;
2900  txn->origin_lsn = origin_lsn;
2901 
2902  if (is_commit)
2903  rb->commit_prepared(rb, txn, commit_lsn);
2904  else
2905  rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
2906 
2907  /* cleanup: make sure there's no cache pollution */
2909  txn->invalidations);
2910  ReorderBufferCleanupTXN(rb, txn);
2911 }
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1696
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
#define RBTXN_PREPARE
TimestampTz commit_time
RepOriginId origin_id
XLogRecPtr origin_lsn
TimestampTz prepare_time
ReorderBufferCommitPreparedCB commit_prepared
ReorderBufferRollbackPreparedCB rollback_prepared

References Assert, ReorderBuffer::commit_prepared, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, pstrdup(), RBTXN_PREPARE, ReorderBufferCleanupTXN(), ReorderBufferExecuteInvalidations(), ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBuffer::rollback_prepared, ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodeAbort(), and DecodeCommit().

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3018 of file reorderbuffer.c.

3019 {
3020  ReorderBufferTXN *txn;
3021 
3022  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3023  false);
3024 
3025  /* unknown, nothing to forget */
3026  if (txn == NULL)
3027  return;
3028 
3029  /* this transaction mustn't be streamed */
3030  Assert(!rbtxn_is_streamed(txn));
3031 
3032  /* cosmetic... */
3033  txn->final_lsn = lsn;
3034 
3035  /*
3036  * Process cache invalidation messages if there are any. Even if we're not
3037  * interested in the transaction's contents, it could have manipulated the
3038  * catalog and we need to update the caches according to that.
3039  */
3040  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3042  txn->invalidations);
3043  else
3044  Assert(txn->ninvalidations == 0);
3045 
3046  /* remove potential on-disk data, and deallocate */
3047  ReorderBufferCleanupTXN(rb, txn);
3048 }

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer rb)

Definition at line 396 of file reorderbuffer.c.

397 {
399 
400  /*
401  * We free separately allocated data by entirely scrapping reorderbuffer's
402  * memory context.
403  */
405 
406  /* Free disk space used by unconsumed reorder buffers */
408 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
tree context
Definition: radixtree.h:1835

References context, ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

◆ ReorderBufferFreeSnap()

static void ReorderBufferFreeSnap ( ReorderBuffer rb,
Snapshot  snap 
)
static

Definition at line 1887 of file reorderbuffer.c.

1888 {
1889  if (snap->copied)
1890  pfree(snap);
1891  else
1893 }
void pfree(void *pointer)
Definition: mcxt.c:1521

References SnapshotData::copied, pfree(), and SnapBuildSnapDecRefcount().

Referenced by ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), ReorderBufferReturnChange(), and ReorderBufferStreamTXN().

◆ ReorderBufferGetCatalogChangesXacts()

TransactionId* ReorderBufferGetCatalogChangesXacts ( ReorderBuffer rb)

Definition at line 3433 of file reorderbuffer.c.

3434 {
3435  dlist_iter iter;
3436  TransactionId *xids = NULL;
3437  size_t xcnt = 0;
3438 
3439  /* Quick return if the list is empty */
3440  if (dclist_count(&rb->catchange_txns) == 0)
3441  return NULL;
3442 
3443  /* Initialize XID array */
3444  xids = (TransactionId *) palloc(sizeof(TransactionId) *
3446  dclist_foreach(iter, &rb->catchange_txns)
3447  {
3449  catchange_node,
3450  iter.cur);
3451 
3453 
3454  xids[xcnt++] = txn->xid;
3455  }
3456 
3457  qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
3458 
3459  Assert(xcnt == dclist_count(&rb->catchange_txns));
3460  return xids;
3461 }
#define dclist_container(type, membername, ptr)
Definition: ilist.h:947
static uint32 dclist_count(const dclist_head *head)
Definition: ilist.h:932
#define dclist_foreach(iter, lhead)
Definition: ilist.h:970

References Assert, ReorderBuffer::catchange_txns, dlist_iter::cur, dclist_container, dclist_count(), dclist_foreach, palloc(), qsort, rbtxn_has_catalog_changes, ReorderBufferTXN::xid, and xidComparator().

Referenced by SnapBuildSerialize().

◆ ReorderBufferGetChange()

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer rb)

Definition at line 1017 of file reorderbuffer.c.

1018 {
1019  ReorderBufferTXN *txn;
1020 
1021  AssertTXNLsnOrder(rb);
1022 
1023  if (dlist_is_empty(&rb->toplevel_by_lsn))
1024  return NULL;
1025 
1027 
1030  return txn;
1031 }
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:603

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, and ReorderBuffer::toplevel_by_lsn.

Referenced by SnapBuildProcessRunningXacts().

◆ ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBuffer rb)

Definition at line 1045 of file reorderbuffer.c.

1046 {
1047  ReorderBufferTXN *txn;
1048 
1049  AssertTXNLsnOrder(rb);
1050 
1052  return InvalidTransactionId;
1053 
1054  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
1056  return txn->base_snapshot->xmin;
1057 }
TransactionId xmin
Definition: snapshot.h:157

References AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, dlist_head_element, dlist_is_empty(), InvalidTransactionId, ReorderBuffer::txns_by_base_snapshot_lsn, and SnapshotData::xmin.

Referenced by SnapBuildProcessRunningXacts().

◆ ReorderBufferGetRelids()

Oid* ReorderBufferGetRelids ( ReorderBuffer rb,
int  nrelids 
)

Definition at line 598 of file reorderbuffer.c.

599 {
600  Oid *relids;
601  Size alloc_len;
602 
603  alloc_len = sizeof(Oid) * nrelids;
604 
605  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
606 
607  return relids;
608 }

References ReorderBuffer::context, and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

◆ ReorderBufferGetTupleBuf()

HeapTuple ReorderBufferGetTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)

Definition at line 565 of file reorderbuffer.c.

566 {
567  HeapTuple tuple;
568  Size alloc_len;
569 
570  alloc_len = tuple_len + SizeofHeapTupleHeader;
571 
573  HEAPTUPLESIZE + alloc_len);
574  tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
575 
576  return tuple;
577 }
#define HEAPTUPLESIZE
Definition: htup.h:73
HeapTupleData * HeapTuple
Definition: htup.h:71
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
#define SizeofHeapTupleHeader
Definition: htup_details.h:185
HeapTupleHeader t_data
Definition: htup.h:68

References HEAPTUPLESIZE, MemoryContextAlloc(), SizeofHeapTupleHeader, HeapTupleData::t_data, and ReorderBuffer::tup_context.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

◆ ReorderBufferGetTXN()

static ReorderBufferTXN * ReorderBufferGetTXN ( ReorderBuffer rb)
static

Definition at line 414 of file reorderbuffer.c.

415 {
416  ReorderBufferTXN *txn;
417 
418  txn = (ReorderBufferTXN *)
420 
421  memset(txn, 0, sizeof(ReorderBufferTXN));
422 
423  dlist_init(&txn->changes);
424  dlist_init(&txn->tuplecids);
425  dlist_init(&txn->subtxns);
426 
427  /* InvalidCommandId is not zero, so set it explicitly */
429  txn->output_plugin_private = NULL;
430 
431  return txn;
432 }
CommandId command_id
void * output_plugin_private

References ReorderBufferTXN::changes, ReorderBufferTXN::command_id, dlist_init(), InvalidCommandId, MemoryContextAlloc(), ReorderBufferTXN::output_plugin_private, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, and ReorderBuffer::txn_context.

Referenced by ReorderBufferTXNByXid().

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 3090 of file reorderbuffer.c.

3092 {
3093  bool use_subtxn = IsTransactionOrTransactionBlock();
3094  int i;
3095 
3096  if (use_subtxn)
3097  BeginInternalSubTransaction("replay");
3098 
3099  /*
3100  * Force invalidations to happen outside of a valid transaction - that way
3101  * entries will just be marked as invalid without accessing the catalog.
3102  * That's advantageous because we don't need to setup the full state
3103  * necessary for catalog access.
3104  */
3105  if (use_subtxn)
3107 
3108  for (i = 0; i < ninvalidations; i++)
3109  LocalExecuteInvalidationMessage(&invalidations[i]);
3110 
3111  if (use_subtxn)
3113 }
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4982
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4687
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4789
void AbortCurrentTransaction(void)
Definition: xact.c:3431

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by ReorderBufferAbort(), ReorderBufferForget(), ReorderBufferInvalidate(), and xact_decode().

◆ ReorderBufferInvalidate()

void ReorderBufferInvalidate ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3059 of file reorderbuffer.c.

3060 {
3061  ReorderBufferTXN *txn;
3062 
3063  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3064  false);
3065 
3066  /* unknown, nothing to do */
3067  if (txn == NULL)
3068  return;
3069 
3070  /*
3071  * Process cache invalidation messages if there are any. Even if we're not
3072  * interested in the transaction's contents, it could have manipulated the
3073  * catalog and we need to update the caches according to that.
3074  */
3075  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3077  txn->invalidations);
3078  else
3079  Assert(txn->ninvalidations == 0);
3080 }

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodePrepare().

◆ ReorderBufferIterCompare()

static int ReorderBufferIterCompare ( Datum  a,
Datum  b,
void *  arg 
)
static

Definition at line 1234 of file reorderbuffer.c.

1235 {
1237  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
1238  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
1239 
1240  if (pos_a < pos_b)
1241  return 1;
1242  else if (pos_a == pos_b)
1243  return 0;
1244  return -1;
1245 }
void * arg
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:202
Definition: regguts.h:323

References a, arg, b, and DatumGetInt32().

Referenced by ReorderBufferIterTXNInit().

◆ ReorderBufferIterTXNFinish()

static void ReorderBufferIterTXNFinish ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1477 of file reorderbuffer.c.

1479 {
1480  int32 off;
1481 
1482  for (off = 0; off < state->nr_txns; off++)
1483  {
1484  if (state->entries[off].file.vfd != -1)
1485  FileClose(state->entries[off].file.vfd);
1486  }
1487 
1488  /* free memory we might have "leaked" in the last *Next call */
1489  if (!dlist_is_empty(&state->old_change))
1490  {
1491  ReorderBufferChange *change;
1492 
1493  change = dlist_container(ReorderBufferChange, node,
1494  dlist_pop_head_node(&state->old_change));
1495  ReorderBufferReturnChange(rb, change, true);
1496  Assert(dlist_is_empty(&state->old_change));
1497  }
1498 
1499  binaryheap_free(state->heap);
1500  pfree(state);
1501 }
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:75
void FileClose(File file)
Definition: fd.c:1978
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:450

References Assert, binaryheap_free(), dlist_container, dlist_is_empty(), dlist_pop_head_node(), FileClose(), pfree(), and ReorderBufferReturnChange().

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferIterTXNInit()

static void ReorderBufferIterTXNInit ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferIterTXNState *volatile *  iter_state 
)
static

Definition at line 1257 of file reorderbuffer.c.

1259 {
1260  Size nr_txns = 0;
1262  dlist_iter cur_txn_i;
1263  int32 off;
1264 
1265  *iter_state = NULL;
1266 
1267  /* Check ordering of changes in the toplevel transaction. */
1268  AssertChangeLsnOrder(txn);
1269 
1270  /*
1271  * Calculate the size of our heap: one element for every transaction that
1272  * contains changes. (Besides the transactions already in the reorder
1273  * buffer, we count the one we were directly passed.)
1274  */
1275  if (txn->nentries > 0)
1276  nr_txns++;
1277 
1278  dlist_foreach(cur_txn_i, &txn->subtxns)
1279  {
1280  ReorderBufferTXN *cur_txn;
1281 
1282  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1283 
1284  /* Check ordering of changes in this subtransaction. */
1285  AssertChangeLsnOrder(cur_txn);
1286 
1287  if (cur_txn->nentries > 0)
1288  nr_txns++;
1289  }
1290 
1291  /* allocate iteration state */
1294  sizeof(ReorderBufferIterTXNState) +
1295  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1296 
1297  state->nr_txns = nr_txns;
1298  dlist_init(&state->old_change);
1299 
1300  for (off = 0; off < state->nr_txns; off++)
1301  {
1302  state->entries[off].file.vfd = -1;
1303  state->entries[off].segno = 0;
1304  }
1305 
1306  /* allocate heap */
1307  state->heap = binaryheap_allocate(state->nr_txns,
1309  state);
1310 
1311  /* Now that the state fields are initialized, it is safe to return it. */
1312  *iter_state = state;
1313 
1314  /*
1315  * Now insert items into the binary heap, in an unordered fashion. (We
1316  * will run a heap assembly step at the end; this is more efficient.)
1317  */
1318 
1319  off = 0;
1320 
1321  /* add toplevel transaction if it contains changes */
1322  if (txn->nentries > 0)
1323  {
1324  ReorderBufferChange *cur_change;
1325 
1326  if (rbtxn_is_serialized(txn))
1327  {
1328  /* serialize remaining changes */
1329  ReorderBufferSerializeTXN(rb, txn);
1330  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1331  &state->entries[off].segno);
1332  }
1333 
1334  cur_change = dlist_head_element(ReorderBufferChange, node,
1335  &txn->changes);
1336 
1337  state->entries[off].lsn = cur_change->lsn;
1338  state->entries[off].change = cur_change;
1339  state->entries[off].txn = txn;
1340 
1342  }
1343 
1344  /* add subtransactions if they contain changes */
1345  dlist_foreach(cur_txn_i, &txn->subtxns)
1346  {
1347  ReorderBufferTXN *cur_txn;
1348 
1349  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1350 
1351  if (cur_txn->nentries > 0)
1352  {
1353  ReorderBufferChange *cur_change;
1354 
1355  if (rbtxn_is_serialized(cur_txn))
1356  {
1357  /* serialize remaining changes */
1358  ReorderBufferSerializeTXN(rb, cur_txn);
1359  ReorderBufferRestoreChanges(rb, cur_txn,
1360  &state->entries[off].file,
1361  &state->entries[off].segno);
1362  }
1363  cur_change = dlist_head_element(ReorderBufferChange, node,
1364  &cur_txn->changes);
1365 
1366  state->entries[off].lsn = cur_change->lsn;
1367  state->entries[off].change = cur_change;
1368  state->entries[off].txn = cur_txn;
1369 
1371  }
1372  }
1373 
1374  /* assemble a valid binary heap */
1375  binaryheap_build(state->heap);
1376 }
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:138
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:39
void binaryheap_add_unordered(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:116
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
static void AssertChangeLsnOrder(ReorderBufferTXN *txn)
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)

References AssertChangeLsnOrder(), binaryheap_add_unordered(), binaryheap_allocate(), binaryheap_build(), ReorderBufferTXN::changes, ReorderBuffer::context, dlist_iter::cur, dlist_container, dlist_foreach, dlist_head_element, dlist_init(), Int32GetDatum(), ReorderBufferChange::lsn, MemoryContextAllocZero(), ReorderBufferTXN::nentries, rbtxn_is_serialized, ReorderBufferIterCompare(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), and ReorderBufferTXN::subtxns.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferIterTXNNext()

static ReorderBufferChange * ReorderBufferIterTXNNext ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1385 of file reorderbuffer.c.

1386 {
1387  ReorderBufferChange *change;
1389  int32 off;
1390 
1391  /* nothing there anymore */
1392  if (state->heap->bh_size == 0)
1393  return NULL;
1394 
1395  off = DatumGetInt32(binaryheap_first(state->heap));
1396  entry = &state->entries[off];
1397 
1398  /* free memory we might have "leaked" in the previous *Next call */
1399  if (!dlist_is_empty(&state->old_change))
1400  {
1401  change = dlist_container(ReorderBufferChange, node,
1402  dlist_pop_head_node(&state->old_change));
1403  ReorderBufferReturnChange(rb, change, true);
1404  Assert(dlist_is_empty(&state->old_change));
1405  }
1406 
1407  change = entry->change;
1408 
1409  /*
1410  * update heap with information about which transaction has the next
1411  * relevant change in LSN order
1412  */
1413 
1414  /* there are in-memory changes */
1415  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1416  {
1417  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1418  ReorderBufferChange *next_change =
1420 
1421  /* txn stays the same */
1422  state->entries[off].lsn = next_change->lsn;
1423  state->entries[off].change = next_change;
1424 
1426  return change;
1427  }
1428 
1429  /* try to load changes from disk */
1430  if (entry->txn->nentries != entry->txn->nentries_mem)
1431  {
1432  /*
1433  * Ugly: restoring changes will reuse *Change records, thus delete the
1434  * current one from the per-tx list and only free in the next call.
1435  */
1436  dlist_delete(&change->node);
1437  dlist_push_tail(&state->old_change, &change->node);
1438 
1439  /*
1440  * Update the total bytes processed by the txn for which we are
1441  * releasing the current set of changes and restoring the new set of
1442  * changes.
1443  */
1444  rb->totalBytes += entry->txn->size;
1445  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1446  &state->entries[off].segno))
1447  {
1448  /* successfully restored changes from disk */
1449  ReorderBufferChange *next_change =
1451  &entry->txn->changes);
1452 
1453  elog(DEBUG2, "restored %u/%u changes from disk",
1454  (uint32) entry->txn->nentries_mem,
1455  (uint32) entry->txn->nentries);
1456 
1457  Assert(entry->txn->nentries_mem);
1458  /* txn stays the same */
1459  state->entries[off].lsn = next_change->lsn;
1460  state->entries[off].change = next_change;
1462 
1463  return change;
1464  }
1465  }
1466 
1467  /* ok, no changes there anymore, remove */
1469 
1470  return change;
1471 }
void binaryheap_replace_first(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:255
bh_node_type binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:177
bh_node_type binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:192
static int32 next
Definition: blutils.c:222
unsigned int uint32
Definition: c.h:509
static bool dlist_has_next(const dlist_head *head, const dlist_node *node)
Definition: ilist.h:503
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:537
ReorderBufferChange * change
ReorderBufferTXN * txn

References Assert, binaryheap_first(), binaryheap_remove_first(), binaryheap_replace_first(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, DatumGetInt32(), DEBUG2, dlist_container, dlist_delete(), dlist_has_next(), dlist_head_element, dlist_is_empty(), dlist_next_node(), dlist_pop_head_node(), dlist_push_tail(), elog, ReorderBufferIterTXNEntry::file, Int32GetDatum(), ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, next, ReorderBufferChange::node, ReorderBufferRestoreChanges(), ReorderBufferReturnChange(), ReorderBufferTXN::size, ReorderBuffer::totalBytes, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferLargestStreamableTopTXN()

static ReorderBufferTXN* ReorderBufferLargestStreamableTopTXN ( ReorderBuffer rb)
static

Definition at line 3588 of file reorderbuffer.c.

3589 {
3590  dlist_iter iter;
3591  Size largest_size = 0;
3592  ReorderBufferTXN *largest = NULL;
3593 
3594  /* Find the largest top-level transaction having a base snapshot. */
3596  {
3597  ReorderBufferTXN *txn;
3598 
3599  txn = dlist_container(ReorderBufferTXN, base_snapshot_node, iter.cur);
3600 
3601  /* must not be a subtxn */
3603  /* base_snapshot must be set */
3604  Assert(txn->base_snapshot != NULL);
3605 
3606  if ((largest == NULL || txn->total_size > largest_size) &&
3607  (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) &&
3609  {
3610  largest = txn;
3611  largest_size = txn->total_size;
3612  }
3613  }
3614 
3615  return largest;
3616 }
#define rbtxn_has_streamable_change(txn)
#define rbtxn_has_partial_change(txn)

References Assert, ReorderBufferTXN::base_snapshot, dlist_iter::cur, dlist_container, dlist_foreach, rbtxn_has_partial_change, rbtxn_has_streamable_change, rbtxn_is_known_subxact, ReorderBufferTXN::total_size, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferCheckMemoryLimit().

◆ ReorderBufferLargestTXN()

static ReorderBufferTXN* ReorderBufferLargestTXN ( ReorderBuffer rb)
static

Definition at line 3548 of file reorderbuffer.c.

3549 {
3550  ReorderBufferTXN *largest;
3551 
3552  /* Get the largest transaction from the max-heap */
3553  largest = pairingheap_container(ReorderBufferTXN, txn_node,
3555 
3556  Assert(largest);
3557  Assert(largest->size > 0);
3558  Assert(largest->size <= rb->size);
3559 
3560  return largest;
3561 }
pairingheap_node * pairingheap_first(pairingheap *heap)
Definition: pairingheap.c:144
#define pairingheap_container(type, membername, ptr)
Definition: pairingheap.h:43

References Assert, pairingheap_container, pairingheap_first(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBuffer::txn_heap.

Referenced by ReorderBufferCheckMemoryLimit().

◆ ReorderBufferPrepare()

void ReorderBufferPrepare ( ReorderBuffer rb,
TransactionId  xid,
char *  gid 
)

Definition at line 2804 of file reorderbuffer.c.

2806 {
2807  ReorderBufferTXN *txn;
2808 
2809  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2810  false);
2811 
2812  /* unknown transaction, nothing to replay */
2813  if (txn == NULL)
2814  return;
2815 
2816  txn->txn_flags |= RBTXN_PREPARE;
2817  txn->gid = pstrdup(gid);
2818 
2819  /* The prepare info must have been updated in txn by now. */
2821 
2822  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2823  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2824 
2825  /*
2826  * We send the prepare for the concurrently aborted xacts so that later
2827  * when rollback prepared is decoded and sent, the downstream should be
2828  * able to rollback such a xact. See comments atop DecodePrepare.
2829  *
2830  * Note, for the concurrent_abort + streaming case a stream_prepare was
2831  * already sent within the ReorderBufferReplay call above.
2832  */
2833  if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
2834  rb->prepare(rb, txn, txn->final_lsn);
2835 }
ReorderBufferPrepareCB prepare

References Assert, ReorderBufferTXN::concurrent_abort, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBuffer::prepare, ReorderBufferTXN::prepare_time, pstrdup(), rbtxn_is_streamed, RBTXN_PREPARE, ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodePrepare().

◆ ReorderBufferProcessPartialChange()

static void ReorderBufferProcessPartialChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  toast_insert 
)
static

Definition at line 714 of file reorderbuffer.c.

717 {
718  ReorderBufferTXN *toptxn;
719 
720  /*
721  * The partial changes need to be processed only while streaming
722  * in-progress transactions.
723  */
724  if (!ReorderBufferCanStream(rb))
725  return;
726 
727  /* Get the top transaction. */
728  toptxn = rbtxn_get_toptxn(txn);
729 
730  /*
731  * Indicate a partial change for toast inserts. The change will be
732  * considered as complete once we get the insert or update on the main
733  * table and we are sure that the pending toast chunks are not required
734  * anymore.
735  *
736  * If we allow streaming when there are pending toast chunks then such
737  * chunks won't be released till the insert (multi_insert) is complete and
738  * we expect the txn to have streamed all changes after streaming. This
739  * restriction is mainly to ensure the correctness of streamed
740  * transactions and it doesn't seem worth uplifting such a restriction
741  * just to allow this case because anyway we will stream the transaction
742  * once such an insert is complete.
743  */
744  if (toast_insert)
746  else if (rbtxn_has_partial_change(toptxn) &&
747  IsInsertOrUpdate(change->action) &&
748  change->data.tp.clear_toast_afterwards)
750 
751  /*
752  * Indicate a partial change for speculative inserts. The change will be
753  * considered as complete once we get the speculative confirm or abort
754  * token.
755  */
756  if (IsSpecInsert(change->action))
758  else if (rbtxn_has_partial_change(toptxn) &&
759  IsSpecConfirmOrAbort(change->action))
761 
762  /*
763  * Stream the transaction if it is serialized before and the changes are
764  * now complete in the top-level transaction.
765  *
766  * The reason for doing the streaming of such a transaction as soon as we
767  * get the complete change for it is that previously it would have reached
768  * the memory threshold and wouldn't get streamed because of incomplete
769  * changes. Delaying such transactions would increase apply lag for them.
770  */
772  !(rbtxn_has_partial_change(toptxn)) &&
773  rbtxn_is_serialized(txn) &&
775  ReorderBufferStreamTXN(rb, toptxn);
776 }
#define IsSpecInsert(action)
#define IsInsertOrUpdate(action)
#define IsSpecConfirmOrAbort(action)
#define RBTXN_HAS_PARTIAL_CHANGE

References ReorderBufferChange::action, ReorderBufferChange::data, IsInsertOrUpdate, IsSpecConfirmOrAbort, IsSpecInsert, rbtxn_get_toptxn, RBTXN_HAS_PARTIAL_CHANGE, rbtxn_has_partial_change, rbtxn_has_streamable_change, rbtxn_is_serialized, ReorderBufferCanStartStreaming(), ReorderBufferCanStream(), ReorderBufferStreamTXN(), ReorderBufferChange::tp, and ReorderBufferTXN::txn_flags.

Referenced by ReorderBufferQueueChange().

◆ ReorderBufferProcessTXN()

static void ReorderBufferProcessTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn,
volatile Snapshot  snapshot_now,
volatile CommandId  command_id,
bool  streaming 
)
static

Definition at line 2104 of file reorderbuffer.c.

2109 {
2110  bool using_subtxn;
2112  ReorderBufferIterTXNState *volatile iterstate = NULL;
2113  volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
2114  ReorderBufferChange *volatile specinsert = NULL;
2115  volatile bool stream_started = false;
2116  ReorderBufferTXN *volatile curtxn = NULL;
2117 
2118  /* build data to be able to lookup the CommandIds of catalog tuples */
2120 
2121  /* setup the initial snapshot */
2122  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2123 
2124  /*
2125  * Decoding needs access to syscaches et al., which in turn use
2126  * heavyweight locks and such. Thus we need to have enough state around to
2127  * keep track of those. The easiest way is to simply use a transaction
2128  * internally. That also allows us to easily enforce that nothing writes
2129  * to the database by checking for xid assignments.
2130  *
2131  * When we're called via the SQL SRF there's already a transaction
2132  * started, so start an explicit subtransaction there.
2133  */
2134  using_subtxn = IsTransactionOrTransactionBlock();
2135 
2136  PG_TRY();
2137  {
2138  ReorderBufferChange *change;
2139  int changes_count = 0; /* used to accumulate the number of
2140  * changes */
2141 
2142  if (using_subtxn)
2143  BeginInternalSubTransaction(streaming ? "stream" : "replay");
2144  else
2146 
2147  /*
2148  * We only need to send begin/begin-prepare for non-streamed
2149  * transactions.
2150  */
2151  if (!streaming)
2152  {
2153  if (rbtxn_prepared(txn))
2154  rb->begin_prepare(rb, txn);
2155  else
2156  rb->begin(rb, txn);
2157  }
2158 
2159  ReorderBufferIterTXNInit(rb, txn, &iterstate);
2160  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
2161  {
2162  Relation relation = NULL;
2163  Oid reloid;
2164 
2166 
2167  /*
2168  * We can't call start stream callback before processing first
2169  * change.
2170  */
2171  if (prev_lsn == InvalidXLogRecPtr)
2172  {
2173  if (streaming)
2174  {
2175  txn->origin_id = change->origin_id;
2176  rb->stream_start(rb, txn, change->lsn);
2177  stream_started = true;
2178  }
2179  }
2180 
2181  /*
2182  * Enforce correct ordering of changes, merged from multiple
2183  * subtransactions. The changes may have the same LSN due to
2184  * MULTI_INSERT xlog records.
2185  */
2186  Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
2187 
2188  prev_lsn = change->lsn;
2189 
2190  /*
2191  * Set the current xid to detect concurrent aborts. This is
2192  * required for the cases when we decode the changes before the
2193  * COMMIT record is processed.
2194  */
2195  if (streaming || rbtxn_prepared(change->txn))
2196  {
2197  curtxn = change->txn;
2198  SetupCheckXidLive(curtxn->xid);
2199  }
2200 
2201  switch (change->action)
2202  {
2204 
2205  /*
2206  * Confirmation for speculative insertion arrived. Simply
2207  * use as a normal record. It'll be cleaned up at the end
2208  * of INSERT processing.
2209  */
2210  if (specinsert == NULL)
2211  elog(ERROR, "invalid ordering of speculative insertion changes");
2212  Assert(specinsert->data.tp.oldtuple == NULL);
2213  change = specinsert;
2215 
2216  /* intentionally fall through */
2220  Assert(snapshot_now);
2221 
2222  reloid = RelidByRelfilenumber(change->data.tp.rlocator.spcOid,
2223  change->data.tp.rlocator.relNumber);
2224 
2225  /*
2226  * Mapped catalog tuple without data, emitted while
2227  * catalog table was in the process of being rewritten. We
2228  * can fail to look up the relfilenumber, because the
2229  * relmapper has no "historic" view, in contrast to the
2230  * normal catalog during decoding. Thus repeated rewrites
2231  * can cause a lookup failure. That's OK because we do not
2232  * decode catalog changes anyway. Normally such tuples
2233  * would be skipped over below, but we can't identify
2234  * whether the table should be logically logged without
2235  * mapping the relfilenumber to the oid.
2236  */
2237  if (reloid == InvalidOid &&
2238  change->data.tp.newtuple == NULL &&
2239  change->data.tp.oldtuple == NULL)
2240  goto change_done;
2241  else if (reloid == InvalidOid)
2242  elog(ERROR, "could not map filenumber \"%s\" to relation OID",
2243  relpathperm(change->data.tp.rlocator,
2244  MAIN_FORKNUM));
2245 
2246  relation = RelationIdGetRelation(reloid);
2247 
2248  if (!RelationIsValid(relation))
2249  elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
2250  reloid,
2251  relpathperm(change->data.tp.rlocator,
2252  MAIN_FORKNUM));
2253 
2254  if (!RelationIsLogicallyLogged(relation))
2255  goto change_done;
2256 
2257  /*
2258  * Ignore temporary heaps created during DDL unless the
2259  * plugin has asked for them.
2260  */
2261  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2262  goto change_done;
2263 
2264  /*
2265  * For now ignore sequence changes entirely. Most of the
2266  * time they don't log changes using records we
2267  * understand, so it doesn't make sense to handle the few
2268  * cases we do.
2269  */
2270  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2271  goto change_done;
2272 
2273  /* user-triggered change */
2274  if (!IsToastRelation(relation))
2275  {
2276  ReorderBufferToastReplace(rb, txn, relation, change);
2277  ReorderBufferApplyChange(rb, txn, relation, change,
2278  streaming);
2279 
2280  /*
2281  * Only clear reassembled toast chunks if we're sure
2282  * they're not required anymore. The creator of the
2283  * tuple tells us.
2284  */
2285  if (change->data.tp.clear_toast_afterwards)
2286  ReorderBufferToastReset(rb, txn);
2287  }
2288  /* we're not interested in toast deletions */
2289  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2290  {
2291  /*
2292  * Need to reassemble the full toasted Datum in
2293  * memory, to ensure the chunks don't get reused till
2294  * we're done remove it from the list of this
2295  * transaction's changes. Otherwise it will get
2296  * freed/reused while restoring spooled data from
2297  * disk.
2298  */
2299  Assert(change->data.tp.newtuple != NULL);
2300 
2301  dlist_delete(&change->node);
2302  ReorderBufferToastAppendChunk(rb, txn, relation,
2303  change);
2304  }
2305 
2306  change_done:
2307 
2308  /*
2309  * If speculative insertion was confirmed, the record
2310  * isn't needed anymore.
2311  */
2312  if (specinsert != NULL)
2313  {
2314  ReorderBufferReturnChange(rb, specinsert, true);
2315  specinsert = NULL;
2316  }
2317 
2318  if (RelationIsValid(relation))
2319  {
2320  RelationClose(relation);
2321  relation = NULL;
2322  }
2323  break;
2324 
2326 
2327  /*
2328  * Speculative insertions are dealt with by delaying the
2329  * processing of the insert until the confirmation record
2330  * arrives. For that we simply unlink the record from the
2331  * chain, so it does not get freed/reused while restoring
2332  * spooled data from disk.
2333  *
2334  * This is safe in the face of concurrent catalog changes
2335  * because the relevant relation can't be changed between
2336  * speculative insertion and confirmation due to
2337  * CheckTableNotInUse() and locking.
2338  */
2339 
2340  /* clear out a pending (and thus failed) speculation */
2341  if (specinsert != NULL)
2342  {
2343  ReorderBufferReturnChange(rb, specinsert, true);
2344  specinsert = NULL;
2345  }
2346 
2347  /* and memorize the pending insertion */
2348  dlist_delete(&change->node);
2349  specinsert = change;
2350  break;
2351 
2353 
2354  /*
2355  * Abort for speculative insertion arrived. So cleanup the
2356  * specinsert tuple and toast hash.
2357  *
2358  * Note that we get the spec abort change for each toast
2359  * entry but we need to perform the cleanup only the first
2360  * time we get it for the main table.
2361  */
2362  if (specinsert != NULL)
2363  {
2364  /*
2365  * We must clean the toast hash before processing a
2366  * completely new tuple to avoid confusion about the
2367  * previous tuple's toast chunks.
2368  */
2369  Assert(change->data.tp.clear_toast_afterwards);
2370  ReorderBufferToastReset(rb, txn);
2371 
2372  /* We don't need this record anymore. */
2373  ReorderBufferReturnChange(rb, specinsert, true);
2374  specinsert = NULL;
2375  }
2376  break;
2377 
2379  {
2380  int i;
2381  int nrelids = change->data.truncate.nrelids;
2382  int nrelations = 0;
2383  Relation *relations;
2384 
2385  relations = palloc0(nrelids * sizeof(Relation));
2386  for (i = 0; i < nrelids; i++)
2387  {
2388  Oid relid = change->data.truncate.relids[i];
2389  Relation rel;
2390 
2391  rel = RelationIdGetRelation(relid);
2392 
2393  if (!RelationIsValid(rel))
2394  elog(ERROR, "could not open relation with OID %u", relid);
2395 
2396  if (!RelationIsLogicallyLogged(rel))
2397  continue;
2398 
2399  relations[nrelations++] = rel;
2400  }
2401 
2402  /* Apply the truncate. */
2403  ReorderBufferApplyTruncate(rb, txn, nrelations,
2404  relations, change,
2405  streaming);
2406 
2407  for (i = 0; i < nrelations; i++)
2408  RelationClose(relations[i]);
2409 
2410  break;
2411  }
2412 
2414  ReorderBufferApplyMessage(rb, txn, change, streaming);
2415  break;
2416 
2418  /* Execute the invalidation messages locally */
2419  ReorderBufferExecuteInvalidations(change->data.inval.ninvalidations,
2420  change->data.inval.invalidations);
2421  break;
2422 
2424  /* get rid of the old */
2425  TeardownHistoricSnapshot(false);
2426 
2427  if (snapshot_now->copied)
2428  {
2429  ReorderBufferFreeSnap(rb, snapshot_now);
2430  snapshot_now =
2431  ReorderBufferCopySnap(rb, change->data.snapshot,
2432  txn, command_id);
2433  }
2434 
2435  /*
2436  * Restored from disk, need to be careful not to double
2437  * free. We could introduce refcounting for that, but for
2438  * now this seems infrequent enough not to care.
2439  */
2440  else if (change->data.snapshot->copied)
2441  {
2442  snapshot_now =
2443  ReorderBufferCopySnap(rb, change->data.snapshot,
2444  txn, command_id);
2445  }
2446  else
2447  {
2448  snapshot_now = change->data.snapshot;
2449  }
2450 
2451  /* and continue with the new one */
2452  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2453  break;
2454 
2456  Assert(change->data.command_id != InvalidCommandId);
2457 
2458  if (command_id < change->data.command_id)
2459  {
2460  command_id = change->data.command_id;
2461 
2462  if (!snapshot_now->copied)
2463  {
2464  /* we don't use the global one anymore */
2465  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2466  txn, command_id);
2467  }
2468 
2469  snapshot_now->curcid = command_id;
2470 
2471  TeardownHistoricSnapshot(false);
2472  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2473  }
2474 
2475  break;
2476 
2478  elog(ERROR, "tuplecid value in changequeue");
2479  break;
2480  }
2481 
2482  /*
2483  * It is possible that the data is not sent to downstream for a
2484  * long time either because the output plugin filtered it or there
2485  * is a DDL that generates a lot of data that is not processed by
2486  * the plugin. So, in such cases, the downstream can timeout. To
2487  * avoid that we try to send a keepalive message if required.
2488  * Trying to send a keepalive message after every change has some
2489  * overhead, but testing showed there is no noticeable overhead if
2490  * we do it after every ~100 changes.
2491  */
2492 #define CHANGES_THRESHOLD 100
2493 
2494  if (++changes_count >= CHANGES_THRESHOLD)
2495  {
2496  rb->update_progress_txn(rb, txn, change->lsn);
2497  changes_count = 0;
2498  }
2499  }
2500 
2501  /* speculative insertion record must be freed by now */
2502  Assert(!specinsert);
2503 
2504  /* clean up the iterator */
2505  ReorderBufferIterTXNFinish(rb, iterstate);
2506  iterstate = NULL;
2507 
2508  /*
2509  * Update total transaction count and total bytes processed by the
2510  * transaction and its subtransactions. Ensure to not count the
2511  * streamed transaction multiple times.
2512  *
2513  * Note that the statistics computation has to be done after
2514  * ReorderBufferIterTXNFinish as it releases the serialized change
2515  * which we have already accounted in ReorderBufferIterTXNNext.
2516  */
2517  if (!rbtxn_is_streamed(txn))
2518  rb->totalTxns++;
2519 
2520  rb->totalBytes += txn->total_size;
2521 
2522  /*
2523  * Done with current changes, send the last message for this set of
2524  * changes depending upon streaming mode.
2525  */
2526  if (streaming)
2527  {
2528  if (stream_started)
2529  {
2530  rb->stream_stop(rb, txn, prev_lsn);
2531  stream_started = false;
2532  }
2533  }
2534  else
2535  {
2536  /*
2537  * Call either PREPARE (for two-phase transactions) or COMMIT (for
2538  * regular ones).
2539  */
2540  if (rbtxn_prepared(txn))
2541  rb->prepare(rb, txn, commit_lsn);
2542  else
2543  rb->commit(rb, txn, commit_lsn);
2544  }
2545 
2546  /* this is just a sanity check against bad output plugin behaviour */
2548  elog(ERROR, "output plugin used XID %u",
2550 
2551  /*
2552  * Remember the command ID and snapshot for the next set of changes in
2553  * streaming mode.
2554  */
2555  if (streaming)
2556  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2557  else if (snapshot_now->copied)
2558  ReorderBufferFreeSnap(rb, snapshot_now);
2559 
2560  /* cleanup */
2561  TeardownHistoricSnapshot(false);
2562 
2563  /*
2564  * Aborting the current (sub-)transaction as a whole has the right
2565  * semantics. We want all locks acquired in here to be released, not
2566  * reassigned to the parent and we do not want any database access
2567  * have persistent effects.
2568  */
2570 
2571  /* make sure there's no cache pollution */
2573 
2574  if (using_subtxn)
2576 
2577  /*
2578  * We are here due to one of the four reasons: 1. Decoding an
2579  * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
2580  * prepared txn that was (partially) streamed. 4. Decoding a committed
2581  * txn.
2582  *
2583  * For 1, we allow truncation of txn data by removing the changes
2584  * already streamed but still keeping other things like invalidations,
2585  * snapshot, and tuplecids. For 2 and 3, we indicate
2586  * ReorderBufferTruncateTXN to do more elaborate truncation of txn
2587  * data as the entire transaction has been decoded except for commit.
2588  * For 4, as the entire txn has been decoded, we can fully clean up
2589  * the TXN reorder buffer.
2590  */
2591  if (streaming || rbtxn_prepared(txn))
2592  {
2594  /* Reset the CheckXidAlive */
2596  }
2597  else
2598  ReorderBufferCleanupTXN(rb, txn);
2599  }
2600  PG_CATCH();
2601  {
2602  MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
2603  ErrorData *errdata = CopyErrorData();
2604 
2605  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
2606  if (iterstate)
2607  ReorderBufferIterTXNFinish(rb, iterstate);
2608 
2610 
2611  /*
2612  * Force cache invalidation to happen outside of a valid transaction
2613  * to prevent catalog access as we just caught an error.
2614  */
2616 
2617  /* make sure there's no cache pollution */
2619  txn->invalidations);
2620 
2621  if (using_subtxn)
2623 
2624  /*
2625  * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2626  * abort of the (sub)transaction we are streaming or preparing. We
2627  * need to do the cleanup and return gracefully on this error, see
2628  * SetupCheckXidLive.
2629  *
2630  * This error code can be thrown by one of the callbacks we call
2631  * during decoding so we need to ensure that we return gracefully only
2632  * when we are sending the data in streaming mode and the streaming is
2633  * not finished yet or when we are sending the data out on a PREPARE
2634  * during a two-phase commit.
2635  */
2636  if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2637  (stream_started || rbtxn_prepared(txn)))
2638  {
2639  /* curtxn must be set for streaming or prepared transactions */
2640  Assert(curtxn);
2641 
2642  /* Cleanup the temporary error state. */
2643  FlushErrorState();
2644  FreeErrorData(errdata);
2645  errdata = NULL;
2646  curtxn->concurrent_abort = true;
2647 
2648  /* Reset the TXN so that it is allowed to stream remaining data. */
2649  ReorderBufferResetTXN(rb, txn, snapshot_now,
2650  command_id, prev_lsn,
2651  specinsert);
2652  }
2653  else
2654  {
2655  ReorderBufferCleanupTXN(rb, txn);
2656  MemoryContextSwitchTo(ecxt);
2657  PG_RE_THROW();
2658  }
2659  }
2660  PG_END_TRY();
2661 }
bool IsToastRelation(Relation relation)
Definition: catalog.c:175
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1818
void FlushErrorState(void)
Definition: elog.c:1867
ErrorData * CopyErrorData(void)
Definition: elog.c:1746
#define PG_RE_THROW()
Definition: elog.h:412
#define PG_TRY(...)
Definition: elog.h:371
#define PG_END_TRY(...)
Definition: elog.h:396
#define PG_CATCH(...)
Definition: elog.h:381
void * palloc0(Size size)
Definition: mcxt.c:1347
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
const void * data
#define InvalidOid
Definition: postgres_ext.h:36
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:701
#define RelationIsValid(relation)
Definition: rel.h:478
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2061
void RelationClose(Relation relation)
Definition: relcache.c:2192
Oid RelidByRelfilenumber(Oid reltablespace, RelFileNumber relfilenumber)
@ MAIN_FORKNUM
Definition: relpath.h:58
#define relpathperm(rlocator, forknum)
Definition: relpath.h:98
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
static void ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
#define CHANGES_THRESHOLD
static void ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
static void SetupCheckXidLive(TransactionId xid)
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
static void ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
#define rbtxn_prepared(txn)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:1665
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:1649
int sqlerrcode
Definition: elog.h:439
Form_pg_class rd_rel
Definition: rel.h:111
RepOriginId origin_id
Definition: reorderbuffer.h:86
ReorderBufferBeginCB begin_prepare
ReorderBufferUpdateProgressTxnCB update_progress_txn
ReorderBufferStreamStopCB stream_stop
ReorderBufferCommitCB commit
ReorderBufferStreamStartCB stream_start
ReorderBufferBeginCB begin
TransactionId CheckXidAlive
Definition: xact.c:98
void StartTransactionCommand(void)
Definition: xact.c:3039
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:470
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:453

References AbortCurrentTransaction(), ReorderBufferChange::action, Assert, ReorderBuffer::begin, ReorderBuffer::begin_prepare, BeginInternalSubTransaction(), CHANGES_THRESHOLD, CHECK_FOR_INTERRUPTS, CheckXidAlive, ReorderBufferChange::command_id, ReorderBuffer::commit, ReorderBufferTXN::concurrent_abort, SnapshotData::copied, CopyErrorData(), SnapshotData::curcid, CurrentMemoryContext, ReorderBufferChange::data, data, dlist_delete(), elog, ERROR, FlushErrorState(), FreeErrorData(), GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), i, ReorderBufferChange::inval, ReorderBufferTXN::invalidations, InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, ReorderBufferChange::node, ReorderBufferChange::origin_id, ReorderBufferTXN::origin_id, ReorderBuffer::output_rewrites, palloc0(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, ReorderBuffer::prepare, rbtxn_is_streamed, rbtxn_prepared, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelationIsValid, RelidByRelfilenumber(), relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferApplyChange(), ReorderBufferApplyMessage(), ReorderBufferApplyTruncate(), ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferResetTXN(), ReorderBufferReturnChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), RollbackAndReleaseCurrentSubTransaction(), SetupCheckXidLive(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, ErrorData::sqlerrcode, StartTransactionCommand(), ReorderBuffer::stream_start, ReorderBuffer::stream_stop, TeardownHistoricSnapshot(), ReorderBufferTXN::total_size, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTXN::tuplecid_hash, ReorderBufferChange::txn, ReorderBuffer::update_progress_txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferReplay(), and ReorderBufferStreamTXN().

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3126 of file reorderbuffer.c.

3127 {
3128  /* many records won't have an xid assigned, centralize check here */
3129  if (xid != InvalidTransactionId)
3130  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3131 }

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by heap2_decode(), heap_decode(), LogicalDecodingProcessRecord(), logicalmsg_decode(), standby_decode(), xact_decode(), and xlog_decode().

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change,
bool  toast_insert 
)

Definition at line 783 of file reorderbuffer.c.

785 {
786  ReorderBufferTXN *txn;
787 
788  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
789 
790  /*
791  * While streaming the previous changes we have detected that the
792  * transaction is aborted. So there is no point in collecting further
793  * changes for it.
794  */
795  if (txn->concurrent_abort)
796  {
797  /*
798  * We don't need to update memory accounting for this change as we
799  * have not added it to the queue yet.
800  */
801  ReorderBufferReturnChange(rb, change, false);
802  return;
803  }
804 
805  /*
806  * The changes that are sent downstream are considered streamable. We
807  * remember such transactions so that only those will later be considered
808  * for streaming.
809  */
810  if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
816  {
817  ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
818 
820  }
821 
822  change->lsn = lsn;
823  change->txn = txn;
824 
825  Assert(InvalidXLogRecPtr != lsn);
826  dlist_push_tail(&txn->changes, &change->node);
827  txn->nentries++;
828  txn->nentries_mem++;
829 
830  /* update memory accounting information */
831  ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
832  ReorderBufferChangeSize(change));
833 
834  /* process partial change */
835  ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
836 
837  /* check the memory limits and evict something if needed */
839 }
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
#define RBTXN_HAS_STREAMABLE_CHANGE

References ReorderBufferChange::action, Assert, ReorderBufferTXN::changes, ReorderBufferTXN::concurrent_abort, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, rbtxn_get_toptxn, RBTXN_HAS_STREAMABLE_CHANGE, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferCheckMemoryLimit(), ReorderBufferProcessPartialChange(), ReorderBufferReturnChange(), ReorderBufferTXNByXid(), ReorderBufferChange::txn, and ReorderBufferTXN::txn_flags.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddInvalidations(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snap,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 846 of file reorderbuffer.c.

850 {
851  if (transactional)
852  {
853  MemoryContext oldcontext;
854  ReorderBufferChange *change;
855 
857 
858  /*
859  * We don't expect snapshots for transactional changes - we'll use the
860  * snapshot derived later during apply (unless the change gets
861  * skipped).
862  */
863  Assert(!snap);
864 
865  oldcontext = MemoryContextSwitchTo(rb->context);
866 
867  change = ReorderBufferGetChange(rb);
869  change->data.msg.prefix = pstrdup(prefix);
870  change->data.msg.message_size = message_size;
871  change->data.msg.message = palloc(message_size);
872  memcpy(change->data.msg.message, message, message_size);
873 
874  ReorderBufferQueueChange(rb, xid, lsn, change, false);
875 
876  MemoryContextSwitchTo(oldcontext);
877  }
878  else
879  {
880  ReorderBufferTXN *txn = NULL;
881  volatile Snapshot snapshot_now = snap;
882 
883  /* Non-transactional changes require a valid snapshot. */
884  Assert(snapshot_now);
885 
886  if (xid != InvalidTransactionId)
887  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
888 
889  /* setup snapshot to allow catalog access */
890  SetupHistoricSnapshot(snapshot_now, NULL);
891  PG_TRY();
892  {
893  rb->message(rb, txn, lsn, false, prefix, message_size, message);
894 
896  }
897  PG_CATCH();
898  {
900  PG_RE_THROW();
901  }
902  PG_END_TRY();
903  }
904 }

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), and TeardownHistoricSnapshot().

Referenced by logicalmsg_decode().

◆ ReorderBufferRememberPrepareInfo()

bool ReorderBufferRememberPrepareInfo ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  prepare_lsn,
XLogRecPtr  end_lsn,
TimestampTz  prepare_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2757 of file reorderbuffer.c.

2761 {
2762  ReorderBufferTXN *txn;
2763 
2764  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2765 
2766  /* unknown transaction, nothing to do */
2767  if (txn == NULL)
2768  return false;
2769 
2770  /*
2771  * Remember the prepare information to be later used by commit prepared in
2772  * case we skip doing prepare.
2773  */
2774  txn->final_lsn = prepare_lsn;
2775  txn->end_lsn = end_lsn;
2776  txn->xact_time.prepare_time = prepare_time;
2777  txn->origin_id = origin_id;
2778  txn->origin_lsn = origin_lsn;
2779 
2780  return true;
2781 }

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, ReorderBufferTXNByXid(), and ReorderBufferTXN::xact_time.

Referenced by DecodePrepare().

◆ ReorderBufferReplay()

static void ReorderBufferReplay ( ReorderBufferTXN txn,
ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)
static

Definition at line 2674 of file reorderbuffer.c.

2679 {
2680  Snapshot snapshot_now;
2681  CommandId command_id = FirstCommandId;
2682 
2683  txn->final_lsn = commit_lsn;
2684  txn->end_lsn = end_lsn;
2685  txn->xact_time.commit_time = commit_time;
2686  txn->origin_id = origin_id;
2687  txn->origin_lsn = origin_lsn;
2688 
2689  /*
2690  * If the transaction was (partially) streamed, we need to commit it in a
2691  * 'streamed' way. That is, we first stream the remaining part of the
2692  * transaction, and then invoke stream_commit message.
2693  *
2694  * Called after everything (origin ID, LSN, ...) is stored in the
2695  * transaction to avoid passing that information directly.
2696  */
2697  if (rbtxn_is_streamed(txn))
2698  {
2699  ReorderBufferStreamCommit(rb, txn);
2700  return;
2701  }
2702 
2703  /*
2704  * If this transaction has no snapshot, it didn't make any changes to the
2705  * database, so there's nothing to decode. Note that
2706  * ReorderBufferCommitChild will have transferred any snapshots from
2707  * subtransactions if there were any.
2708  */
2709  if (txn->base_snapshot == NULL)
2710  {
2711  Assert(txn->ninvalidations == 0);
2712 
2713  /*
2714  * Removing this txn before a commit might result in the computation
2715  * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
2716  */
2717  if (!rbtxn_prepared(txn))
2718  ReorderBufferCleanupTXN(rb, txn);
2719  return;
2720  }
2721 
2722  snapshot_now = txn->base_snapshot;
2723 
2724  /* Process and send the changes to output plugin. */
2725  ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2726  command_id, false);
2727 }
#define FirstCommandId
Definition: c.h:671
uint32 CommandId
Definition: c.h:669
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, FirstCommandId, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, rbtxn_is_streamed, rbtxn_prepared, ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), ReorderBufferStreamCommit(), and ReorderBufferTXN::xact_time.

Referenced by ReorderBufferCommit(), ReorderBufferFinishPrepared(), and ReorderBufferPrepare().

◆ ReorderBufferResetTXN()

static void ReorderBufferResetTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id,
XLogRecPtr  last_lsn,
ReorderBufferChange specinsert 
)
static

Definition at line 2058 of file reorderbuffer.c.

2063 {
2064  /* Discard the changes that we just streamed */
2066 
2067  /* Free all resources allocated for toast reconstruction */
2068  ReorderBufferToastReset(rb, txn);
2069 
2070  /* Return the spec insert change if it is not NULL */
2071  if (specinsert != NULL)
2072  {
2073  ReorderBufferReturnChange(rb, specinsert, true);
2074  specinsert = NULL;
2075  }
2076 
2077  /*
2078  * For the streaming case, stop the stream and remember the command ID and
2079  * snapshot for the streaming run.
2080  */
2081  if (rbtxn_is_streamed(txn))
2082  {
2083  rb->stream_stop(rb, txn, last_lsn);
2084  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2085  }
2086 
2087  /* All changes must be deallocated */
2088  Assert(txn->size == 0);
2089 }

References Assert, rbtxn_is_streamed, rbtxn_prepared, ReorderBufferReturnChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), ReorderBufferTXN::size, and ReorderBuffer::stream_stop.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferRestoreChange()

static void ReorderBufferRestoreChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
char *  data 
)
static

Definition at line 4395 of file reorderbuffer.c.

4397 {
4398  ReorderBufferDiskChange *ondisk;
4399  ReorderBufferChange *change;
4400 
4401  ondisk = (ReorderBufferDiskChange *) data;
4402 
4403  change = ReorderBufferGetChange(rb);
4404 
4405  /* copy static part */
4406  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
4407 
4408  data += sizeof(ReorderBufferDiskChange);
4409 
4410  /* restore individual stuff */
4411  switch (change->action)
4412  {
4413  /* fall through these, they're all similar enough */
4418  if (change->data.tp.oldtuple)
4419  {
4420  uint32 tuplelen = ((HeapTuple) data)->t_len;
4421 
4422  change->data.tp.oldtuple =
4424 
4425  /* restore ->tuple */
4426  memcpy(change->data.tp.oldtuple, data,
4427  sizeof(HeapTupleData));
4428  data += sizeof(HeapTupleData);
4429 
4430  /* reset t_data pointer into the new tuplebuf */
4431  change->data.tp.oldtuple->t_data =
4432  (HeapTupleHeader) ((char *) change->data.tp.oldtuple + HEAPTUPLESIZE);
4433 
4434  /* restore tuple data itself */
4435  memcpy(change->data.tp.oldtuple->t_data, data, tuplelen);
4436  data += tuplelen;
4437  }
4438 
4439  if (change->data.tp.newtuple)
4440  {
4441  /* here, data might not be suitably aligned! */
4442  uint32 tuplelen;
4443 
4444  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
4445  sizeof(uint32));
4446 
4447  change->data.tp.newtuple =
4449 
4450  /* restore ->tuple */
4451  memcpy(change->data.tp.newtuple, data,
4452  sizeof(HeapTupleData));
4453  data += sizeof(HeapTupleData);
4454 
4455  /* reset t_data pointer into the new tuplebuf */
4456  change->data.tp.newtuple->t_data =
4457  (HeapTupleHeader) ((char *) change->data.tp.newtuple + HEAPTUPLESIZE);
4458 
4459  /* restore tuple data itself */
4460  memcpy(change->data.tp.newtuple->t_data, data, tuplelen);
4461  data += tuplelen;
4462  }
4463 
4464  break;
4466  {
4467  Size prefix_size;
4468 
4469  /* read prefix */
4470  memcpy(&prefix_size, data, sizeof(Size));
4471  data += sizeof(Size);
4472  change->data.msg.prefix = MemoryContextAlloc(rb->context,
4473  prefix_size);
4474  memcpy(change->data.msg.prefix, data, prefix_size);
4475  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
4476  data += prefix_size;
4477 
4478  /* read the message */
4479  memcpy(&change->data.msg.message_size, data, sizeof(Size));
4480  data += sizeof(Size);
4481  change->data.msg.message = MemoryContextAlloc(rb->context,
4482  change->data.msg.message_size);
4483  memcpy(change->data.msg.message, data,
4484  change->data.msg.message_size);
4485  data += change->data.msg.message_size;
4486 
4487  break;
4488  }
4490  {
4491  Size inval_size = sizeof(SharedInvalidationMessage) *
4492  change->data.inval.ninvalidations;
4493 
4494  change->data.inval.invalidations =
4495  MemoryContextAlloc(rb->context, inval_size);
4496 
4497  /* read the message */
4498  memcpy(change->data.inval.invalidations, data, inval_size);
4499 
4500  break;
4501  }
4503  {
4504  Snapshot oldsnap;
4505  Snapshot newsnap;
4506  Size size;
4507 
4508  oldsnap = (Snapshot) data;
4509 
4510  size = sizeof(SnapshotData) +
4511  sizeof(TransactionId) * oldsnap->xcnt +
4512  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
4513 
4515 
4516  newsnap = change->data.snapshot;
4517 
4518  memcpy(newsnap, data, size);
4519  newsnap->xip = (TransactionId *)
4520  (((char *) newsnap) + sizeof(SnapshotData));
4521  newsnap->subxip = newsnap->xip + newsnap->xcnt;
4522  newsnap->copied = true;
4523  break;
4524  }
4525  /* the base struct contains all the data, easy peasy */
4527  {
4528  Oid *relids;
4529 
4530  relids = ReorderBufferGetRelids(rb,
4531  change->data.truncate.nrelids);
4532  memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
4533  change->data.truncate.relids = relids;
4534 
4535  break;
4536  }
4541  break;
4542  }
4543 
4544  dlist_push_tail(&txn->changes, &change->node);
4545  txn->nentries_mem++;
4546 
4547  /*
4548  * Update memory accounting for the restored change. We need to do this
4549  * although we don't check the memory limit when restoring the changes in
4550  * this branch (we only do that when initially queueing the changes after
4551  * decoding), because we will release the changes later, and that will
4552  * update the accounting too (subtracting the size from the counters). And
4553  * we don't want to underflow there.
4554  */
4555  ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
4556  ReorderBufferChangeSize(change));
4557 }
struct ReorderBufferDiskChange ReorderBufferDiskChange
HeapTuple ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
Oid * ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
struct SnapshotData * Snapshot
Definition: snapshot.h:121
ReorderBufferChange change

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, ReorderBufferTXN::changes, ReorderBuffer::context, SnapshotData::copied, ReorderBufferChange::data, data, dlist_push_tail(), HEAPTUPLESIZE, ReorderBufferChange::inval, MemoryContextAlloc(), MemoryContextAllocZero(), ReorderBufferChange::msg, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferGetChange(), ReorderBufferGetRelids(), ReorderBufferGetTupleBuf(), size, SizeofHeapTupleHeader, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, ReorderBufferChange::tp, ReorderBufferChange::truncate, SnapshotData::xcnt, and SnapshotData::xip.

Referenced by ReorderBufferRestoreChanges().

◆ ReorderBufferRestoreChanges()

static Size ReorderBufferRestoreChanges ( ReorderBuffer rb,
ReorderBufferTXN txn,
TXNEntryFile file,
XLogSegNo segno 
)
static

Definition at line 4252 of file reorderbuffer.c.

4254 {
4255  Size restored = 0;
4256  XLogSegNo last_segno;
4257  dlist_mutable_iter cleanup_iter;
4258  File *fd = &file->vfd;
4259 
4262 
4263  /* free current entries, so we have memory for more */
4264  dlist_foreach_modify(cleanup_iter, &txn->changes)
4265  {
4267  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
4268 
4269  dlist_delete(&cleanup->node);
4271  }
4272  txn->nentries_mem = 0;
4273  Assert(dlist_is_empty(&txn->changes));
4274 
4275  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
4276 
4277  while (restored < max_changes_in_memory && *segno <= last_segno)
4278  {
4279  int readBytes;
4280  ReorderBufferDiskChange *ondisk;
4281 
4283 
4284  if (*fd == -1)
4285  {
4286  char path[MAXPGPATH];
4287 
4288  /* first time in */
4289  if (*segno == 0)
4290  XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
4291 
4292  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
4293 
4294  /*
4295  * No need to care about TLIs here, only used during a single run,
4296  * so each LSN only maps to a specific WAL record.
4297  */
4299  *segno);
4300 
4301  *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
4302 
4303  /* No harm in resetting the offset even in case of failure */
4304  file->curOffset = 0;
4305 
4306  if (*fd < 0 && errno == ENOENT)
4307  {
4308  *fd = -1;
4309  (*segno)++;
4310  continue;
4311  }
4312  else if (*fd < 0)
4313  ereport(ERROR,
4315  errmsg("could not open file \"%s\": %m",
4316  path)));
4317  }
4318 
4319  /*
4320  * Read the statically sized part of a change which has information
4321  * about the total size. If we couldn't read a record, we're at the
4322  * end of this file.
4323  */
4325  readBytes = FileRead(file->vfd, rb->outbuf,
4326  sizeof(ReorderBufferDiskChange),
4327  file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
4328 
4329  /* eof */
4330  if (readBytes == 0)
4331  {
4332  FileClose(*fd);
4333  *fd = -1;
4334  (*segno)++;
4335  continue;
4336  }
4337  else if (readBytes < 0)
4338  ereport(ERROR,
4340  errmsg("could not read from reorderbuffer spill file: %m")));
4341  else if (readBytes != sizeof(ReorderBufferDiskChange))
4342  ereport(ERROR,
4344  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4345  readBytes,
4346  (uint32) sizeof(ReorderBufferDiskChange))));
4347 
4348  file->curOffset += readBytes;
4349 
4350  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4351 
4353  sizeof(ReorderBufferDiskChange) + ondisk->size);
4354  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4355 
4356  readBytes = FileRead(file->vfd,
4357  rb->outbuf + sizeof(ReorderBufferDiskChange),
4358  ondisk->size - sizeof(ReorderBufferDiskChange),
4359  file->curOffset,
4360  WAIT_EVENT_REORDER_BUFFER_READ);
4361 
4362  if (readBytes < 0)
4363  ereport(ERROR,
4365  errmsg("could not read from reorderbuffer spill file: %m")));
4366  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
4367  ereport(ERROR,
4369  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4370  readBytes,
4371  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
4372 
4373  file->curOffset += readBytes;
4374 
4375  /*
4376  * ok, read a full change from disk, now restore it into proper
4377  * in-memory format
4378  */
4379  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
4380  restored++;
4381  }
4382 
4383  return restored;
4384 }
static void cleanup(void)
Definition: bootstrap.c:683
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1575
static ssize_t FileRead(File file, void *buffer, size_t amount, off_t offset, uint32 wait_event_info)
Definition: fd.h:196
int File
Definition: fd.h:51
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *data)
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
static const Size max_changes_in_memory
int wal_segment_size
Definition: xlog.c:142
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition: xlogdefs.h:48

References Assert, ReorderBufferTXN::changes, CHECK_FOR_INTERRUPTS, cleanup(), dlist_mutable_iter::cur, TXNEntryFile::curOffset, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FileClose(), FileRead(), ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, max_changes_in_memory, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBuffer::outbuf, PathNameOpenFile(), PG_BINARY, ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferSerializedPath(), ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, TXNEntryFile::vfd, wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferIterTXNInit(), and ReorderBufferIterTXNNext().

◆ ReorderBufferRestoreCleanup()

static void ReorderBufferRestoreCleanup ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 4563 of file reorderbuffer.c.

4564 {
4565  XLogSegNo first;
4566  XLogSegNo cur;
4567  XLogSegNo last;
4568 
4571 
4572  XLByteToSeg(txn->first_lsn, first, wal_segment_size);
4573  XLByteToSeg(txn->final_lsn, last, wal_segment_size);
4574 
4575  /* iterate over all possible filenames, and delete them */
4576  for (cur = first; cur <= last; cur++)
4577  {
4578  char path[MAXPGPATH];
4579 
4581  if (unlink(path) != 0 && errno != ENOENT)
4582  ereport(ERROR,
4584  errmsg("could not remove file \"%s\": %m", path)));
4585  }
4586 }
struct cursor * cur
Definition: ecpg.c:28

References Assert, cur, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, MAXPGPATH, MyReplicationSlot, ReorderBufferSerializedPath(), wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferCleanupTXN(), and ReorderBufferTruncateTXN().

◆ ReorderBufferReturnChange()

void ReorderBufferReturnChange ( ReorderBuffer rb,
ReorderBufferChange change,
bool  upd_mem 
)

Definition at line 495 of file reorderbuffer.c.

497 {
498  /* update memory accounting info */
499  if (upd_mem)
500  ReorderBufferChangeMemoryUpdate(rb, change, NULL, false,
501  ReorderBufferChangeSize(change));
502 
503  /* free contained data */
504  switch (change->action)
505  {
510  if (change->data.tp.newtuple)
511  {
512  ReorderBufferReturnTupleBuf(change->data.tp.newtuple);
513  change->data.tp.newtuple = NULL;
514  }
515 
516  if (change->data.tp.oldtuple)
517  {
518  ReorderBufferReturnTupleBuf(change->data.tp.oldtuple);
519  change->data.tp.oldtuple = NULL;
520  }
521  break;
523  if (change->data.msg.prefix != NULL)
524  pfree(change->data.msg.prefix);
525  change->data.msg.prefix = NULL;
526  if (change->data.msg.message != NULL)
527  pfree(change->data.msg.message);
528  change->data.msg.message = NULL;
529  break;
531  if (change->data.inval.invalidations)
532  pfree(change->data.inval.invalidations);
533  change->data.inval.invalidations = NULL;
534  break;
536  if (change->data.snapshot)
537  {
538  ReorderBufferFreeSnap(rb, change->data.snapshot);
539  change->data.snapshot = NULL;
540  }
541  break;
542  /* no data in addition to the struct itself */
544  if (change->data.truncate.relids != NULL)
545  {
546  ReorderBufferReturnRelids(rb, change->data.truncate.relids);
547  change->data.truncate.relids = NULL;
548  }
549  break;
554  break;
555  }
556 
557  pfree(change);
558 }
void ReorderBufferReturnTupleBuf(HeapTuple tuple)
void ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::msg, pfree(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferFreeSnap(), ReorderBufferReturnRelids(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, ReorderBufferChange::tp, and ReorderBufferChange::truncate.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferProcessTXN(), ReorderBufferQueueChange(), ReorderBufferResetTXN(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferToastReset(), and ReorderBufferTruncateTXN().

◆ ReorderBufferReturnRelids()

void ReorderBufferReturnRelids ( ReorderBuffer rb,
Oid relids 
)

Definition at line 614 of file reorderbuffer.c.

615 {
616  pfree(relids);
617 }

References pfree().

Referenced by ReorderBufferReturnChange().

◆ ReorderBufferReturnTupleBuf()

void ReorderBufferReturnTupleBuf ( HeapTuple  tuple)

Definition at line 583 of file reorderbuffer.c.

584 {
585  pfree(tuple);
586 }

References pfree().

Referenced by ReorderBufferReturnChange().

◆ ReorderBufferReturnTXN()

static void ReorderBufferReturnTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 438 of file reorderbuffer.c.

439 {
440  /* clean the lookup cache if we were cached (quite likely) */
441  if (rb->by_txn_last_xid == txn->xid)
442  {
444  rb->by_txn_last_txn = NULL;
445  }
446 
447  /* free data that's contained */
448 
449  if (txn->gid != NULL)
450  {
451  pfree(txn->gid);
452  txn->gid = NULL;
453  }
454 
455  if (txn->tuplecid_hash != NULL)
456  {
458  txn->tuplecid_hash = NULL;
459  }
460 
461  if (txn->invalidations)
462  {
463  pfree(txn->invalidations);
464  txn->invalidations = NULL;
465  }
466 
467  /* Reset the toast hash */
468  ReorderBufferToastReset(rb, txn);
469 
470  /* All changes must be deallocated */
471  Assert(txn->size == 0);
472 
473  pfree(txn);
474 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:865

References Assert, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBufferTXN::gid, hash_destroy(), ReorderBufferTXN::invalidations, InvalidTransactionId, pfree(), ReorderBufferToastReset(), ReorderBufferTXN::size, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::xid.

Referenced by ReorderBufferCleanupTXN().

◆ ReorderBufferSaveTXNSnapshot()

static void ReorderBufferSaveTXNSnapshot ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id 
)
inlinestatic

Definition at line 2037 of file reorderbuffer.c.

2039 {
2040  txn->command_id = command_id;
2041 
2042  /* Avoid copying if it's already copied. */
2043  if (snapshot_now->copied)
2044  txn->snapshot_now = snapshot_now;
2045  else
2046  txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2047  txn, command_id);
2048 }

References ReorderBufferTXN::command_id, SnapshotData::copied, ReorderBufferCopySnap(), and ReorderBufferTXN::snapshot_now.

Referenced by ReorderBufferProcessTXN(), and ReorderBufferResetTXN().

◆ ReorderBufferSerializeChange()

static void ReorderBufferSerializeChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  fd,
ReorderBufferChange change 
)
static

Definition at line 3800 of file reorderbuffer.c.

3802 {
3803  ReorderBufferDiskChange *ondisk;
3804  Size sz = sizeof(ReorderBufferDiskChange);
3805 
3807 
3808  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3809  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
3810 
3811  switch (change->action)
3812  {
3813  /* fall through these, they're all similar enough */
3818  {
3819  char *data;
3820  HeapTuple oldtup,
3821  newtup;
3822  Size oldlen = 0;
3823  Size newlen = 0;
3824 
3825  oldtup = change->data.tp.oldtuple;
3826  newtup = change->data.tp.newtuple;
3827 
3828  if (oldtup)
3829  {
3830  sz += sizeof(HeapTupleData);
3831  oldlen = oldtup->t_len;
3832  sz += oldlen;
3833  }
3834 
3835  if (newtup)
3836  {
3837  sz += sizeof(HeapTupleData);
3838  newlen = newtup->t_len;
3839  sz += newlen;
3840  }
3841 
3842  /* make sure we have enough space */
3844 
3845  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3846  /* might have been reallocated above */
3847  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3848 
3849  if (oldlen)
3850  {
3851  memcpy(data, oldtup, sizeof(HeapTupleData));
3852  data += sizeof(HeapTupleData);
3853 
3854  memcpy(data, oldtup->t_data, oldlen);
3855  data += oldlen;
3856  }
3857 
3858  if (newlen)
3859  {
3860  memcpy(data, newtup, sizeof(HeapTupleData));
3861  data += sizeof(HeapTupleData);
3862 
3863  memcpy(data, newtup->t_data, newlen);
3864  data += newlen;
3865  }
3866  break;
3867  }
3869  {
3870  char *data;
3871  Size prefix_size = strlen(change->data.msg.prefix) + 1;
3872 
3873  sz += prefix_size + change->data.msg.message_size +
3874  sizeof(Size) + sizeof(Size);
3876 
3877  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3878 
3879  /* might have been reallocated above */
3880  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3881 
3882  /* write the prefix including the size */
3883  memcpy(data, &prefix_size, sizeof(Size));
3884  data += sizeof(Size);
3885  memcpy(data, change->data.msg.prefix,
3886  prefix_size);
3887  data += prefix_size;
3888 
3889  /* write the message including the size */
3890  memcpy(data, &change->data.msg.message_size, sizeof(Size));
3891  data += sizeof(Size);
3892  memcpy(data, change->data.msg.message,
3893  change->data.msg.message_size);
3894  data += change->data.msg.message_size;
3895 
3896  break;
3897  }
3899  {
3900  char *data;
3901  Size inval_size = sizeof(SharedInvalidationMessage) *
3902  change->data.inval.ninvalidations;
3903 
3904  sz += inval_size;
3905 
3907  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3908 
3909  /* might have been reallocated above */
3910  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3911  memcpy(data, change->data.inval.invalidations, inval_size);
3912  data += inval_size;
3913 
3914  break;
3915  }
3917  {
3918  Snapshot snap;
3919  char *data;
3920 
3921  snap = change->data.snapshot;
3922 
3923  sz += sizeof(SnapshotData) +
3924  sizeof(TransactionId) * snap->xcnt +
3925  sizeof(TransactionId) * snap->subxcnt;
3926 
3927  /* make sure we have enough space */
3929  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3930  /* might have been reallocated above */
3931  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3932 
3933  memcpy(data, snap, sizeof(SnapshotData));
3934  data += sizeof(SnapshotData);
3935 
3936  if (snap->xcnt)
3937  {
3938  memcpy(data, snap->xip,
3939  sizeof(TransactionId) * snap->xcnt);
3940  data += sizeof(TransactionId) * snap->xcnt;
3941  }
3942 
3943  if (snap->subxcnt)
3944  {
3945  memcpy(data, snap->subxip,
3946  sizeof(TransactionId) * snap->subxcnt);
3947  data += sizeof(TransactionId) * snap->subxcnt;
3948  }
3949  break;
3950  }
3952  {
3953  Size size;
3954  char *data;
3955 
3956  /* account for the OIDs of truncated relations */
3957  size = sizeof(Oid) * change->data.truncate.nrelids;
3958  sz += size;
3959 
3960  /* make sure we have enough space */
3962 
3963  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3964  /* might have been reallocated above */
3965  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3966 
3967  memcpy(data, change->data.truncate.relids, size);
3968  data += size;
3969 
3970  break;
3971  }
3976  /* ReorderBufferChange contains everything important */
3977  break;
3978  }
3979 
3980  ondisk->size = sz;
3981 
3982  errno = 0;
3983  pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
3984  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
3985  {
3986  int save_errno = errno;
3987 
3989 
3990  /* if write didn't set errno, assume problem is no disk space */
3991  errno = save_errno ? save_errno : ENOSPC;
3992  ereport(ERROR,
3994  errmsg("could not write to data file for XID %u: %m",
3995  txn->xid)));
3996  }
3998 
3999  /*
4000  * Keep the transaction's final_lsn up to date with each change we send to
4001  * disk, so that ReorderBufferRestoreCleanup works correctly. (We used to
4002  * only do this on commit and abort records, but that doesn't work if a
4003  * system crash leaves a transaction without its abort record).
4004  *
4005  * Make sure not to move it backwards.
4006  */
4007  if (txn->final_lsn < change->lsn)
4008  txn->final_lsn = change->lsn;
4009 
4010  Assert(ondisk->change.action == change->action);
4011 }
#define write(a, b, c)
Definition: win32.h:14
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, CloseTransientFile(), ReorderBufferChange::data, data, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferTXN::final_lsn, if(), ReorderBufferChange::inval, ReorderBufferChange::lsn, ReorderBufferChange::msg, ReorderBuffer::outbuf, pgstat_report_wait_end(), pgstat_report_wait_start(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, size, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, write, SnapshotData::xcnt, ReorderBufferTXN::xid, and SnapshotData::xip.

Referenced by ReorderBufferSerializeTXN().

◆ ReorderBufferSerializedPath()

static void ReorderBufferSerializedPath ( char *  path,
ReplicationSlot slot,
TransactionId  xid,
XLogSegNo  segno 
)
static

Definition at line 4632 of file reorderbuffer.c.

4634 {
4635  XLogRecPtr recptr;
4636 
4637  XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
4638 
4639  snprintf(path, MAXPGPATH, "%s/%s/xid-%u-lsn-%X-%X.spill",
4642  xid, LSN_FORMAT_ARGS(recptr));
4643 }
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43

References ReplicationSlot::data, LSN_FORMAT_ARGS, MAXPGPATH, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, PG_REPLSLOT_DIR, snprintf, wal_segment_size, and XLogSegNoOffsetToRecPtr.

Referenced by ReorderBufferRestoreChanges(), ReorderBufferRestoreCleanup(), and ReorderBufferSerializeTXN().

◆ ReorderBufferSerializeReserve()

static void ReorderBufferSerializeReserve ( ReorderBuffer rb,
Size  sz 
)
static

Definition at line 3515 of file reorderbuffer.c.

3516 {
3517  if (!rb->outbufsize)
3518  {
3519  rb->outbuf = MemoryContextAlloc(rb->context, sz);
3520  rb->outbufsize = sz;
3521  }
3522  else if (rb->outbufsize < sz)
3523  {
3524  rb->outbuf = repalloc(rb->outbuf, sz);
3525  rb->outbufsize = sz;
3526  }
3527 }

References ReorderBuffer::context, MemoryContextAlloc(), ReorderBuffer::outbuf, ReorderBuffer::outbufsize, and repalloc().

Referenced by ReorderBufferRestoreChanges(), and ReorderBufferSerializeChange().

◆ ReorderBufferSerializeTXN()

static void ReorderBufferSerializeTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 3705 of file reorderbuffer.c.

3706 {
3707  dlist_iter subtxn_i;
3708  dlist_mutable_iter change_i;
3709  int fd = -1;
3710  XLogSegNo curOpenSegNo = 0;
3711  Size spilled = 0;
3712  Size size = txn->size;
3713 
3714  elog(DEBUG2, "spill %u changes in XID %u to disk",
3715  (uint32) txn->nentries_mem, txn->xid);
3716 
3717  /* do the same to all child TXs */
3718  dlist_foreach(subtxn_i, &txn->subtxns)
3719  {
3720  ReorderBufferTXN *subtxn;
3721 
3722  subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
3723  ReorderBufferSerializeTXN(rb, subtxn);
3724  }
3725 
3726  /* serialize changestream */
3727  dlist_foreach_modify(change_i, &txn->changes)
3728  {
3729  ReorderBufferChange *change;
3730 
3731  change = dlist_container(ReorderBufferChange, node, change_i.cur);
3732 
3733  /*
3734  * store in segment in which it belongs by start lsn, don't split over
3735  * multiple segments tho
3736  */
3737  if (fd == -1 ||
3738  !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
3739  {
3740  char path[MAXPGPATH];
3741 
3742  if (fd != -1)
3744 
3745  XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
3746 
3747  /*
3748  * No need to care about TLIs here, only used during a single run,
3749  * so each LSN only maps to a specific WAL record.
3750  */
3752  curOpenSegNo);
3753 
3754  /* open segment, create it if necessary */
3755  fd = OpenTransientFile(path,
3756  O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
3757 
3758  if (fd < 0)
3759  ereport(ERROR,
3761  errmsg("could not open file \"%s\": %m", path)));
3762  }
3763 
3764  ReorderBufferSerializeChange(rb, txn, fd, change);
3765  dlist_delete(&change->node);
3766  ReorderBufferReturnChange(rb, change, false);
3767 
3768  spilled++;
3769  }
3770 
3771  /* Update the memory counter */
3772  ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, size);
3773 
3774  /* update the statistics iff we have spilled anything */
3775  if (spilled)
3776  {
3777  rb->spillCount += 1;
3778  rb->spillBytes += size;
3779 
3780  /* don't consider already serialized transactions */
3781  rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
3782 
3783  /* update the decoding stats */
3785  }
3786 
3787  Assert(spilled == txn->nentries_mem);
3788  Assert(dlist_is_empty(&txn->changes));
3789  txn->nentries_mem = 0;
3791 
3792  if (fd != -1)
3794 }
void UpdateDecodingStats(LogicalDecodingContext *ctx)
Definition: logical.c:1932
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
#define rbtxn_is_serialized_clear(txn)
#define RBTXN_IS_SERIALIZED
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)

References Assert, ReorderBufferTXN::changes, CloseTransientFile(), dlist_iter::cur, dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_delete(), dlist_foreach, dlist_foreach_modify, dlist_is_empty(), elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferChange::lsn, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, OpenTransientFile(), PG_BINARY, ReorderBuffer::private_data, RBTXN_IS_SERIALIZED, rbtxn_is_serialized, rbtxn_is_serialized_clear, ReorderBufferChangeMemoryUpdate(), ReorderBufferReturnChange(), ReorderBufferSerializeChange(), ReorderBufferSerializedPath(), size, ReorderBufferTXN::size, ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBufferTXN::subtxns, ReorderBufferTXN::txn_flags, UpdateDecodingStats(), wal_segment_size, ReorderBufferTXN::xid, XLByteInSeg, and XLByteToSeg.

Referenced by ReorderBufferCheckMemoryLimit(), and ReorderBufferIterTXNInit().

◆ ReorderBufferSetBaseSnapshot()

void ReorderBufferSetBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 3157 of file reorderbuffer.c.

3159 {
3160  ReorderBufferTXN *txn;
3161  bool is_new;
3162 
3163  Assert(snap != NULL);
3164 
3165  /*
3166  * Fetch the transaction to operate on. If we know it's a subtransaction,
3167  * operate on its top-level transaction instead.
3168  */
3169  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
3170  if (rbtxn_is_known_subxact(txn))
3171  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3172  NULL, InvalidXLogRecPtr, false);
3173  Assert(txn->base_snapshot == NULL);
3174 
3175  txn->