PostgreSQL Source Code  git master
reorderbuffer.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/detoast.h"
#include "access/heapam.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/combocid.h"
#include "utils/memdebug.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenodemap.h"
Include dependency graph for reorderbuffer.c:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTXNByIdEnt
 
struct  ReorderBufferTupleCidKey
 
struct  ReorderBufferTupleCidEnt
 
struct  TXNEntryFile
 
struct  ReorderBufferIterTXNEntry
 
struct  ReorderBufferIterTXNState
 
struct  ReorderBufferToastEnt
 
struct  ReorderBufferDiskChange
 
struct  RewriteMappingFile
 

Macros

#define IsSpecInsert(action)
 
#define IsSpecConfirm(action)
 
#define IsInsertOrUpdate(action)
 

Typedefs

typedef struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
 
typedef struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
 
typedef struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
 
typedef struct TXNEntryFile TXNEntryFile
 
typedef struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
 
typedef struct ReorderBufferIterTXNState ReorderBufferIterTXNState
 
typedef struct ReorderBufferToastEnt ReorderBufferToastEnt
 
typedef struct ReorderBufferDiskChange ReorderBufferDiskChange
 
typedef struct RewriteMappingFile RewriteMappingFile
 

Functions

static ReorderBufferTXNReorderBufferGetTXN (ReorderBuffer *rb)
 
static void ReorderBufferReturnTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferTXNReorderBufferTXNByXid (ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
 
static void ReorderBufferTransferSnapToParent (ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
 
static void AssertTXNLsnOrder (ReorderBuffer *rb)
 
static void ReorderBufferIterTXNInit (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
 
static ReorderBufferChangeReorderBufferIterTXNNext (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferIterTXNFinish (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferExecuteInvalidations (uint32 nmsgs, SharedInvalidationMessage *msgs)
 
static void ReorderBufferCheckMemoryLimit (ReorderBuffer *rb)
 
static void ReorderBufferSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeChange (ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
 
static Size ReorderBufferRestoreChanges (ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
 
static void ReorderBufferRestoreChange (ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
 
static void ReorderBufferRestoreCleanup (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferTruncateTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
 
static void ReorderBufferCleanupSerializedTXNs (const char *slotname)
 
static void ReorderBufferSerializedPath (char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
 
static void ReorderBufferFreeSnap (ReorderBuffer *rb, Snapshot snap)
 
static Snapshot ReorderBufferCopySnap (ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
 
static bool ReorderBufferCanStream (ReorderBuffer *rb)
 
static bool ReorderBufferCanStartStreaming (ReorderBuffer *rb)
 
static void ReorderBufferStreamTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferStreamCommit (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastInitHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReset (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReplace (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void ReorderBufferToastAppendChunk (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static Size ReorderBufferChangeSize (ReorderBufferChange *change)
 
static void ReorderBufferChangeMemoryUpdate (ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
 
ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *rb)
 
void ReorderBufferReturnChange (ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
 
ReorderBufferTupleBufReorderBufferGetTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
 
OidReorderBufferGetRelids (ReorderBuffer *rb, int nrelids)
 
void ReorderBufferReturnRelids (ReorderBuffer *rb, Oid *relids)
 
static void ReorderBufferProcessPartialChange (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
static void AssertChangeLsnOrder (ReorderBufferTXN *txn)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
TransactionId ReorderBufferGetOldestXmin (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
static int ReorderBufferIterCompare (Datum a, Datum b, void *arg)
 
static void ReorderBufferCleanupTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferBuildTupleCidHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void SetupCheckXidLive (TransactionId xid)
 
static void ReorderBufferApplyChange (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyTruncate (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyMessage (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferSaveTXNSnapshot (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
 
static void ReorderBufferResetTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
 
static void ReorderBufferProcessTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)
 
static void ReorderBufferReplay (ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
bool ReorderBufferRememberPrepareInfo (ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferSkipPrepare (ReorderBuffer *rb, TransactionId xid)
 
void ReorderBufferPrepare (ReorderBuffer *rb, TransactionId xid, char *gid)
 
void ReorderBufferFinishPrepared (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr initial_consistent_point, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferInvalidate (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
static void ReorderBufferSerializeReserve (ReorderBuffer *rb, Size sz)
 
static ReorderBufferTXNReorderBufferLargestTXN (ReorderBuffer *rb)
 
static ReorderBufferTXNReorderBufferLargestTopTXN (ReorderBuffer *rb)
 
void StartupReorderBuffer (void)
 
static void ApplyLogicalMappingFile (HTAB *tuplecid_data, Oid relid, const char *fname)
 
static bool TransactionIdInArray (TransactionId xid, TransactionId *xip, Size num)
 
static int file_sort_by_lsn (const ListCell *a_p, const ListCell *b_p)
 
static void UpdateLogicalMappings (HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
 
bool ResolveCminCmaxDuringDecoding (HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
 

Variables

int logical_decoding_work_mem
 
static const Size max_changes_in_memory = 4096
 

Macro Definition Documentation

◆ IsInsertOrUpdate

◆ IsSpecConfirm

◆ IsSpecInsert

Typedef Documentation

◆ ReorderBufferDiskChange

◆ ReorderBufferIterTXNEntry

◆ ReorderBufferIterTXNState

◆ ReorderBufferToastEnt

◆ ReorderBufferTupleCidEnt

◆ ReorderBufferTupleCidKey

◆ ReorderBufferTXNByIdEnt

◆ RewriteMappingFile

◆ TXNEntryFile

typedef struct TXNEntryFile TXNEntryFile

Function Documentation

◆ ApplyLogicalMappingFile()

static void ApplyLogicalMappingFile ( HTAB tuplecid_data,
Oid  relid,
const char *  fname 
)
static

Definition at line 4844 of file reorderbuffer.c.

References Assert, CloseTransientFile(), ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), HASH_ENTER, HASH_FIND, hash_search(), InvalidCommandId, ItemPointerCopy, sort-test::key, MAXPGPATH, LogicalRewriteMappingData::new_node, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_node, LogicalRewriteMappingData::old_tid, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, ReorderBufferTupleCidKey::relnode, sprintf, ReorderBufferTupleCidKey::tid, and WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ.

Referenced by UpdateLogicalMappings().

4845 {
4846  char path[MAXPGPATH];
4847  int fd;
4848  int readBytes;
4850 
4851  sprintf(path, "pg_logical/mappings/%s", fname);
4852  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
4853  if (fd < 0)
4854  ereport(ERROR,
4856  errmsg("could not open file \"%s\": %m", path)));
4857 
4858  while (true)
4859  {
4862  ReorderBufferTupleCidEnt *new_ent;
4863  bool found;
4864 
4865  /* be careful about padding */
4866  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
4867 
4868  /* read all mappings till the end of the file */
4870  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
4872 
4873  if (readBytes < 0)
4874  ereport(ERROR,
4876  errmsg("could not read file \"%s\": %m",
4877  path)));
4878  else if (readBytes == 0) /* EOF */
4879  break;
4880  else if (readBytes != sizeof(LogicalRewriteMappingData))
4881  ereport(ERROR,
4883  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
4884  path, readBytes,
4885  (int32) sizeof(LogicalRewriteMappingData))));
4886 
4887  key.relnode = map.old_node;
4888  ItemPointerCopy(&map.old_tid,
4889  &key.tid);
4890 
4891 
4892  ent = (ReorderBufferTupleCidEnt *)
4893  hash_search(tuplecid_data,
4894  (void *) &key,
4895  HASH_FIND,
4896  NULL);
4897 
4898  /* no existing mapping, no need to update */
4899  if (!ent)
4900  continue;
4901 
4902  key.relnode = map.new_node;
4903  ItemPointerCopy(&map.new_tid,
4904  &key.tid);
4905 
4906  new_ent = (ReorderBufferTupleCidEnt *)
4907  hash_search(tuplecid_data,
4908  (void *) &key,
4909  HASH_ENTER,
4910  &found);
4911 
4912  if (found)
4913  {
4914  /*
4915  * Make sure the existing mapping makes sense. We sometime update
4916  * old records that did not yet have a cmax (e.g. pg_class' own
4917  * entry while rewriting it) during rewrites, so allow that.
4918  */
4919  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
4920  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
4921  }
4922  else
4923  {
4924  /* update mapping */
4925  new_ent->cmin = ent->cmin;
4926  new_ent->cmax = ent->cmax;
4927  new_ent->combocid = ent->combocid;
4928  }
4929  }
4930 
4931  if (CloseTransientFile(fd) != 0)
4932  ereport(ERROR,
4934  errmsg("could not close file \"%s\": %m", path)));
4935 }
static void pgstat_report_wait_end(void)
Definition: wait_event.h:277
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1271
signed int int32
Definition: c.h:429
#define sprintf
Definition: port.h:218
#define ERROR
Definition: elog.h:46
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2423
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:721
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:261
int CloseTransientFile(int fd)
Definition: fd.c:2600
#define InvalidCommandId
Definition: c.h:604
#define ereport(elevel,...)
Definition: elog.h:157
ItemPointerData new_tid
Definition: rewriteheap.h:40
#define Assert(condition)
Definition: c.h:804
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define read(a, b, c)
Definition: win32.h:13
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:161
ItemPointerData old_tid
Definition: rewriteheap.h:39

◆ AssertChangeLsnOrder()

static void AssertChangeLsnOrder ( ReorderBufferTXN txn)
static

Definition at line 915 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, and ReorderBufferChange::lsn.

Referenced by ReorderBufferIterTXNInit().

916 {
917 #ifdef USE_ASSERT_CHECKING
918  dlist_iter iter;
919  XLogRecPtr prev_lsn = txn->first_lsn;
920 
921  dlist_foreach(iter, &txn->changes)
922  {
923  ReorderBufferChange *cur_change;
924 
925  cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
926 
928  Assert(cur_change->lsn != InvalidXLogRecPtr);
929  Assert(txn->first_lsn <= cur_change->lsn);
930 
931  if (txn->end_lsn != InvalidXLogRecPtr)
932  Assert(cur_change->lsn <= txn->end_lsn);
933 
934  Assert(prev_lsn <= cur_change->lsn);
935 
936  prev_lsn = cur_change->lsn;
937  }
938 #endif
939 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_foreach(iter, lhead)
Definition: ilist.h:526
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
dlist_head changes
dlist_node * cur
Definition: ilist.h:161
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn

◆ AssertTXNLsnOrder()

static void AssertTXNLsnOrder ( ReorderBuffer rb)
static

Definition at line 858 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferAssignChild(), ReorderBufferGetOldestTXN(), ReorderBufferGetOldestXmin(), ReorderBufferSetBaseSnapshot(), and ReorderBufferTXNByXid().

859 {
860 #ifdef USE_ASSERT_CHECKING
861  dlist_iter iter;
862  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
863  XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
864 
865  dlist_foreach(iter, &rb->toplevel_by_lsn)
866  {
868  iter.cur);
869 
870  /* start LSN must be set */
871  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
872 
873  /* If there is an end LSN, it must be higher than start LSN */
874  if (cur_txn->end_lsn != InvalidXLogRecPtr)
875  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
876 
877  /* Current initial LSN must be strictly higher than previous */
878  if (prev_first_lsn != InvalidXLogRecPtr)
879  Assert(prev_first_lsn < cur_txn->first_lsn);
880 
881  /* known-as-subtxn txns must not be listed */
882  Assert(!rbtxn_is_known_subxact(cur_txn));
883 
884  prev_first_lsn = cur_txn->first_lsn;
885  }
886 
888  {
890  base_snapshot_node,
891  iter.cur);
892 
893  /* base snapshot (and its LSN) must be set */
894  Assert(cur_txn->base_snapshot != NULL);
896 
897  /* current LSN must be strictly higher than previous */
898  if (prev_base_snap_lsn != InvalidXLogRecPtr)
899  Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
900 
901  /* known-as-subtxn txns must not be listed */
902  Assert(!rbtxn_is_known_subxact(cur_txn));
903 
904  prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
905  }
906 #endif
907 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
#define dlist_foreach(iter, lhead)
Definition: ilist.h:526
XLogRecPtr base_snapshot_lsn
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
dlist_head txns_by_base_snapshot_lsn
dlist_head toplevel_by_lsn
dlist_node * cur
Definition: ilist.h:161
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn
#define rbtxn_is_known_subxact(txn)

◆ file_sort_by_lsn()

static int file_sort_by_lsn ( const ListCell a_p,
const ListCell b_p 
)
static

Definition at line 4952 of file reorderbuffer.c.

References lfirst, and RewriteMappingFile::lsn.

Referenced by UpdateLogicalMappings().

4953 {
4956 
4957  if (a->lsn < b->lsn)
4958  return -1;
4959  else if (a->lsn > b->lsn)
4960  return 1;
4961  return 0;
4962 }
#define lfirst(lc)
Definition: pg_list.h:169

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2791 of file reorderbuffer.c.

References ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), ReorderBuffer::stream_abort, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeAbort().

2792 {
2793  ReorderBufferTXN *txn;
2794 
2795  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2796  false);
2797 
2798  /* unknown, nothing to remove */
2799  if (txn == NULL)
2800  return;
2801 
2802  /* For streamed transactions notify the remote node about the abort. */
2803  if (rbtxn_is_streamed(txn))
2804  {
2805  rb->stream_abort(rb, txn, lsn);
2806 
2807  /*
2808  * We might have decoded changes for this transaction that could load
2809  * the cache as per the current transaction's view (consider DDL's
2810  * happened in this transaction). We don't want the decoding of future
2811  * transactions to use those cache entries so execute invalidations.
2812  */
2813  if (txn->ninvalidations > 0)
2815  txn->invalidations);
2816  }
2817 
2818  /* cosmetic... */
2819  txn->final_lsn = lsn;
2820 
2821  /* remove potential on-disk data, and deallocate */
2822  ReorderBufferCleanupTXN(rb, txn);
2823 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
ReorderBufferStreamAbortCB stream_abort
#define rbtxn_is_streamed(txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)

Definition at line 2833 of file reorderbuffer.c.

References dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, elog, ReorderBufferCleanupTXN(), ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::xid.

Referenced by DecodeStandbyOp().

2834 {
2835  dlist_mutable_iter it;
2836 
2837  /*
2838  * Iterate through all (potential) toplevel TXNs and abort all that are
2839  * older than what possibly can be running. Once we've found the first
2840  * that is alive we stop, there might be some that acquired an xid earlier
2841  * but started writing later, but it's unlikely and they will be cleaned
2842  * up in a later call to this function.
2843  */
2845  {
2846  ReorderBufferTXN *txn;
2847 
2848  txn = dlist_container(ReorderBufferTXN, node, it.cur);
2849 
2850  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2851  {
2852  elog(DEBUG2, "aborting old transaction %u", txn->xid);
2853 
2854  /* remove potential on-disk data, and deallocate this tx */
2855  ReorderBufferCleanupTXN(rb, txn);
2856  }
2857  else
2858  return;
2859  }
2860 }
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:543
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
#define DEBUG2
Definition: elog.h:24
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
dlist_head toplevel_by_lsn
TransactionId xid
#define elog(elevel,...)
Definition: elog.h:232

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 3168 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferTXN::invalidations, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, palloc(), REORDER_BUFFER_CHANGE_INVALIDATION, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), repalloc(), ReorderBufferTXN::toptxn, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeXactOp().

3171 {
3172  ReorderBufferTXN *txn;
3173  MemoryContext oldcontext;
3174  ReorderBufferChange *change;
3175 
3176  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3177 
3178  oldcontext = MemoryContextSwitchTo(rb->context);
3179 
3180  /*
3181  * Collect all the invalidations under the top transaction so that we can
3182  * execute them all together. See comment atop this function
3183  */
3184  if (txn->toptxn)
3185  txn = txn->toptxn;
3186 
3187  Assert(nmsgs > 0);
3188 
3189  /* Accumulate invalidations. */
3190  if (txn->ninvalidations == 0)
3191  {
3192  txn->ninvalidations = nmsgs;
3194  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3195  memcpy(txn->invalidations, msgs,
3196  sizeof(SharedInvalidationMessage) * nmsgs);
3197  }
3198  else
3199  {
3202  (txn->ninvalidations + nmsgs));
3203 
3204  memcpy(txn->invalidations + txn->ninvalidations, msgs,
3205  nmsgs * sizeof(SharedInvalidationMessage));
3206  txn->ninvalidations += nmsgs;
3207  }
3208 
3209  change = ReorderBufferGetChange(rb);
3211  change->data.inval.ninvalidations = nmsgs;
3212  change->data.inval.invalidations = (SharedInvalidationMessage *)
3213  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3214  memcpy(change->data.inval.invalidations, msgs,
3215  sizeof(SharedInvalidationMessage) * nmsgs);
3216 
3217  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3218 
3219  MemoryContextSwitchTo(oldcontext);
3220 }
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
struct ReorderBufferTXN * toptxn
struct ReorderBufferChange::@97::@102 inval
union ReorderBufferChange::@97 data
MemoryContext context
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
#define Assert(condition)
Definition: c.h:804
SharedInvalidationMessage * invalidations
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1182
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:1062

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

Definition at line 3047 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::command_id, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, ReorderBufferGetChange(), and ReorderBufferQueueChange().

Referenced by SnapBuildProcessNewCid().

3049 {
3051 
3052  change->data.command_id = cid;
3054 
3055  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3056 }
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
union ReorderBufferChange::@97 data
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileNode  node,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 3132 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessNewCid().

3136 {
3138  ReorderBufferTXN *txn;
3139 
3140  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3141 
3142  change->data.tuplecid.node = node;
3143  change->data.tuplecid.tid = tid;
3144  change->data.tuplecid.cmin = cmin;
3145  change->data.tuplecid.cmax = cmax;
3146  change->data.tuplecid.combocid = combocid;
3147  change->lsn = lsn;
3148  change->txn = txn;
3150 
3151  dlist_push_tail(&txn->tuplecids, &change->node);
3152  txn->ntuplecids++;
3153 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:87
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
union ReorderBufferChange::@97 data
struct ReorderBufferChange::@97::@101 tuplecid
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
dlist_head tuplecids

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 2998 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, ReorderBufferGetChange(), ReorderBufferQueueChange(), and ReorderBufferChange::snapshot.

Referenced by SnapBuildDistributeNewCatalogSnapshot().

3000 {
3002 
3003  change->data.snapshot = snap;
3005 
3006  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3007 }
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
union ReorderBufferChange::@97 data
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)

◆ ReorderBufferAllocate()

ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 299 of file reorderbuffer.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, ReorderBufferCleanupSerializedTXNs(), ReorderBuffer::size, SLAB_DEFAULT_BLOCK_SIZE, SLAB_LARGE_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBuffer::toplevel_by_lsn, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBuffer::tup_context, ReorderBuffer::txn_context, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

300 {
301  ReorderBuffer *buffer;
302  HASHCTL hash_ctl;
303  MemoryContext new_ctx;
304 
305  Assert(MyReplicationSlot != NULL);
306 
307  /* allocate memory in own context, to have better accountability */
309  "ReorderBuffer",
311 
312  buffer =
313  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
314 
315  memset(&hash_ctl, 0, sizeof(hash_ctl));
316 
317  buffer->context = new_ctx;
318 
319  buffer->change_context = SlabContextCreate(new_ctx,
320  "Change",
322  sizeof(ReorderBufferChange));
323 
324  buffer->txn_context = SlabContextCreate(new_ctx,
325  "TXN",
327  sizeof(ReorderBufferTXN));
328 
329  buffer->tup_context = GenerationContextCreate(new_ctx,
330  "Tuples",
332 
333  hash_ctl.keysize = sizeof(TransactionId);
334  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
335  hash_ctl.hcxt = buffer->context;
336 
337  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
339 
341  buffer->by_txn_last_txn = NULL;
342 
343  buffer->outbuf = NULL;
344  buffer->outbufsize = 0;
345  buffer->size = 0;
346 
347  buffer->spillTxns = 0;
348  buffer->spillCount = 0;
349  buffer->spillBytes = 0;
350  buffer->streamTxns = 0;
351  buffer->streamCount = 0;
352  buffer->streamBytes = 0;
353  buffer->totalTxns = 0;
354  buffer->totalBytes = 0;
355 
357 
358  dlist_init(&buffer->toplevel_by_lsn);
360 
361  /*
362  * Ensure there's no stale data from prior uses of this slot, in case some
363  * prior exit avoided calling ReorderBufferFree. Failure to do this can
364  * produce duplicated txns, and it's very cheap if there's nothing there.
365  */
367 
368  return buffer;
369 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define AllocSetContextCreate
Definition: memutils.h:173
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
uint32 TransactionId
Definition: c.h:587
MemoryContext hcxt
Definition: hsearch.h:86
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
Size entrysize
Definition: hsearch.h:76
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:175
ReplicationSlotPersistentData data
Definition: slot.h:156
MemoryContext change_context
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:225
dlist_head txns_by_base_snapshot_lsn
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:195
#define InvalidTransactionId
Definition: transam.h:31
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
#define HASH_BLOBS
Definition: hsearch.h:97
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size blockSize)
Definition: generation.c:197
MemoryContext context
ReorderBufferTXN * by_txn_last_txn
dlist_head toplevel_by_lsn
Size keysize
Definition: hsearch.h:75
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:804
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:224
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863
#define NameStr(name)
Definition: c.h:681
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
MemoryContext tup_context
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext txn_context

◆ ReorderBufferApplyChange()

static void ReorderBufferApplyChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1897 of file reorderbuffer.c.

References ReorderBuffer::apply_change, and ReorderBuffer::stream_change.

Referenced by ReorderBufferProcessTXN().

1900 {
1901  if (streaming)
1902  rb->stream_change(rb, txn, relation, change);
1903  else
1904  rb->apply_change(rb, txn, relation, change);
1905 }
ReorderBufferApplyChangeCB apply_change
ReorderBufferStreamChangeCB stream_change

◆ ReorderBufferApplyMessage()

static void ReorderBufferApplyMessage ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1925 of file reorderbuffer.c.

References ReorderBufferChange::data, ReorderBufferChange::lsn, ReorderBuffer::message, ReorderBufferChange::msg, and ReorderBuffer::stream_message.

Referenced by ReorderBufferProcessTXN().

1927 {
1928  if (streaming)
1929  rb->stream_message(rb, txn, change->lsn, true,
1930  change->data.msg.prefix,
1931  change->data.msg.message_size,
1932  change->data.msg.message);
1933  else
1934  rb->message(rb, txn, change->lsn, true,
1935  change->data.msg.prefix,
1936  change->data.msg.message_size,
1937  change->data.msg.message);
1938 }
ReorderBufferStreamMessageCB stream_message
ReorderBufferMessageCB message
union ReorderBufferChange::@97 data
struct ReorderBufferChange::@97::@100 msg

◆ ReorderBufferApplyTruncate()

static void ReorderBufferApplyTruncate ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  nrelations,
Relation relations,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1911 of file reorderbuffer.c.

References ReorderBuffer::apply_truncate, and ReorderBuffer::stream_truncate.

Referenced by ReorderBufferProcessTXN().

1914 {
1915  if (streaming)
1916  rb->stream_truncate(rb, txn, nrelations, relations, change);
1917  else
1918  rb->apply_truncate(rb, txn, nrelations, relations, change);
1919 }
ReorderBufferStreamTruncateCB stream_truncate
ReorderBufferApplyTruncateCB apply_truncate

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 1001 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SUBXACT, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, ReorderBufferTXN::toplevel_xid, ReorderBufferTXN::toptxn, ReorderBufferTXNByIdEnt::txn, ReorderBufferTXN::txn_flags, and ReorderBufferTXNByIdEnt::xid.

Referenced by LogicalDecodingProcessRecord(), and ReorderBufferCommitChild().

1003 {
1004  ReorderBufferTXN *txn;
1005  ReorderBufferTXN *subtxn;
1006  bool new_top;
1007  bool new_sub;
1008 
1009  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1010  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1011 
1012  if (!new_sub)
1013  {
1014  if (rbtxn_is_known_subxact(subtxn))
1015  {
1016  /* already associated, nothing to do */
1017  return;
1018  }
1019  else
1020  {
1021  /*
1022  * We already saw this transaction, but initially added it to the
1023  * list of top-level txns. Now that we know it's not top-level,
1024  * remove it from there.
1025  */
1026  dlist_delete(&subtxn->node);
1027  }
1028  }
1029 
1030  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1031  subtxn->toplevel_xid = xid;
1032  Assert(subtxn->nsubtxns == 0);
1033 
1034  /* set the reference to top-level transaction */
1035  subtxn->toptxn = txn;
1036 
1037  /* add to subtransaction list */
1038  dlist_push_tail(&txn->subtxns, &subtxn->node);
1039  txn->nsubtxns++;
1040 
1041  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1042  ReorderBufferTransferSnapToParent(txn, subtxn);
1043 
1044  /* Verify LSN-ordering invariant */
1045  AssertTXNLsnOrder(rb);
1046 }
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
#define RBTXN_IS_SUBXACT
struct ReorderBufferTXN * toptxn
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:804
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_known_subxact(txn)
TransactionId toplevel_xid

◆ ReorderBufferBuildTupleCidHash()

static void ReorderBufferBuildTupleCidHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1660 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, dlist_container, dlist_foreach, dlist_is_empty(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FIND, hash_search(), HASHCTL::hcxt, InvalidCommandId, ItemPointerCopy, sort-test::key, HASHCTL::keysize, ReorderBufferTXN::ntuplecids, rbtxn_has_catalog_changes, ReorderBufferTupleCidKey::relnode, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferTupleCidKey::tid, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::tuplecids.

Referenced by ReorderBufferProcessTXN().

1661 {
1662  dlist_iter iter;
1663  HASHCTL hash_ctl;
1664 
1666  return;
1667 
1668  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1669  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1670  hash_ctl.hcxt = rb->context;
1671 
1672  /*
1673  * create the hash with the exact number of to-be-stored tuplecids from
1674  * the start
1675  */
1676  txn->tuplecid_hash =
1677  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1679 
1680  dlist_foreach(iter, &txn->tuplecids)
1681  {
1684  bool found;
1685  ReorderBufferChange *change;
1686 
1687  change = dlist_container(ReorderBufferChange, node, iter.cur);
1688 
1690 
1691  /* be careful about padding */
1692  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1693 
1694  key.relnode = change->data.tuplecid.node;
1695 
1696  ItemPointerCopy(&change->data.tuplecid.tid,
1697  &key.tid);
1698 
1699  ent = (ReorderBufferTupleCidEnt *)
1701  (void *) &key,
1703  &found);
1704  if (!found)
1705  {
1706  ent->cmin = change->data.tuplecid.cmin;
1707  ent->cmax = change->data.tuplecid.cmax;
1708  ent->combocid = change->data.tuplecid.combocid;
1709  }
1710  else
1711  {
1712  /*
1713  * Maybe we already saw this tuple before in this transaction, but
1714  * if so it must have the same cmin.
1715  */
1716  Assert(ent->cmin == change->data.tuplecid.cmin);
1717 
1718  /*
1719  * cmax may be initially invalid, but once set it can only grow,
1720  * and never become invalid again.
1721  */
1722  Assert((ent->cmax == InvalidCommandId) ||
1723  ((change->data.tuplecid.cmax != InvalidCommandId) &&
1724  (change->data.tuplecid.cmax > ent->cmax)));
1725  ent->cmax = change->data.tuplecid.cmax;
1726  }
1727  }
1728 }
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
MemoryContext hcxt
Definition: hsearch.h:86
#define dlist_foreach(iter, lhead)
Definition: ilist.h:526
Size entrysize
Definition: hsearch.h:76
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
union ReorderBufferChange::@97 data
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
#define HASH_BLOBS
Definition: hsearch.h:97
MemoryContext context
#define InvalidCommandId
Definition: c.h:604
Size keysize
Definition: hsearch.h:75
dlist_node * cur
Definition: ilist.h:161
#define Assert(condition)
Definition: c.h:804
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
struct ReorderBufferChange::@97::@101 tuplecid
#define rbtxn_has_catalog_changes(txn)
dlist_head tuplecids
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:161

◆ ReorderBufferCanStartStreaming()

static bool ReorderBufferCanStartStreaming ( ReorderBuffer rb)
inlinestatic

Definition at line 3803 of file reorderbuffer.c.

References XLogReaderState::EndRecPtr, ReorderBuffer::private_data, LogicalDecodingContext::reader, ReorderBufferCanStream(), SNAPBUILD_CONSISTENT, SnapBuildCurrentState(), SnapBuildXactNeedsSkip(), and LogicalDecodingContext::snapshot_builder.

Referenced by ReorderBufferCheckMemoryLimit(), and ReorderBufferProcessPartialChange().

3804 {
3806  SnapBuild *builder = ctx->snapshot_builder;
3807 
3808  /* We can't start streaming unless a consistent state is reached. */
3810  return false;
3811 
3812  /*
3813  * We can't start streaming immediately even if the streaming is enabled
3814  * because we previously decoded this transaction and now just are
3815  * restarting.
3816  */
3817  if (ReorderBufferCanStream(rb) &&
3818  !SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
3819  return true;
3820 
3821  return false;
3822 }
void * private_data
bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
Definition: snapbuild.c:385
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
Definition: snapbuild.c:367
XLogRecPtr EndRecPtr
Definition: xlogreader.h:176
static bool ReorderBufferCanStream(ReorderBuffer *rb)
struct SnapBuild * snapshot_builder
Definition: logical.h:43
XLogReaderState * reader
Definition: logical.h:41

◆ ReorderBufferCanStream()

static bool ReorderBufferCanStream ( ReorderBuffer rb)
inlinestatic

◆ ReorderBufferChangeMemoryUpdate()

static void ReorderBufferChangeMemoryUpdate ( ReorderBuffer rb,
ReorderBufferChange change,
bool  addition 
)
static

Definition at line 3072 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferChangeSize(), ReorderBufferTXN::size, ReorderBuffer::size, ReorderBufferTXN::toptxn, ReorderBufferTXN::total_size, ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferReturnChange(), and ReorderBufferToastReplace().

3075 {
3076  Size sz;
3077  ReorderBufferTXN *txn;
3078  ReorderBufferTXN *toptxn;
3079 
3080  Assert(change->txn);
3081 
3082  /*
3083  * Ignore tuple CID changes, because those are not evicted when reaching
3084  * memory limit. So we just don't count them, because it might easily
3085  * trigger a pointless attempt to spill.
3086  */
3088  return;
3089 
3090  txn = change->txn;
3091 
3092  /*
3093  * Update the total size in top level as well. This is later used to
3094  * compute the decoding stats.
3095  */
3096  if (txn->toptxn != NULL)
3097  toptxn = txn->toptxn;
3098  else
3099  toptxn = txn;
3100 
3101  sz = ReorderBufferChangeSize(change);
3102 
3103  if (addition)
3104  {
3105  txn->size += sz;
3106  rb->size += sz;
3107 
3108  /* Update the total size in the top transaction. */
3109  toptxn->total_size += sz;
3110  }
3111  else
3112  {
3113  Assert((rb->size >= sz) && (txn->size >= sz));
3114  txn->size -= sz;
3115  rb->size -= sz;
3116 
3117  /* Update the total size in the top transaction. */
3118  toptxn->total_size -= sz;
3119  }
3120 
3121  Assert(txn->size <= rb->size);
3122 }
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:87
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
struct ReorderBufferTXN * toptxn
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
#define Assert(condition)
Definition: c.h:804
size_t Size
Definition: c.h:540

◆ ReorderBufferChangeSize()

static Size ReorderBufferChangeSize ( ReorderBufferChange change)
static

Definition at line 3946 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::msg, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::snapshot, SnapshotData::subxcnt, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTupleBuf::tuple, and SnapshotData::xcnt.

Referenced by ReorderBufferChangeMemoryUpdate().

3947 {
3948  Size sz = sizeof(ReorderBufferChange);
3949 
3950  switch (change->action)
3951  {
3952  /* fall through these, they're all similar enough */
3957  {
3958  ReorderBufferTupleBuf *oldtup,
3959  *newtup;
3960  Size oldlen = 0;
3961  Size newlen = 0;
3962 
3963  oldtup = change->data.tp.oldtuple;
3964  newtup = change->data.tp.newtuple;
3965 
3966  if (oldtup)
3967  {
3968  sz += sizeof(HeapTupleData);
3969  oldlen = oldtup->tuple.t_len;
3970  sz += oldlen;
3971  }
3972 
3973  if (newtup)
3974  {
3975  sz += sizeof(HeapTupleData);
3976  newlen = newtup->tuple.t_len;
3977  sz += newlen;
3978  }
3979 
3980  break;
3981  }
3983  {
3984  Size prefix_size = strlen(change->data.msg.prefix) + 1;
3985 
3986  sz += prefix_size + change->data.msg.message_size +
3987  sizeof(Size) + sizeof(Size);
3988 
3989  break;
3990  }
3992  {
3993  sz += sizeof(SharedInvalidationMessage) *
3994  change->data.inval.ninvalidations;
3995  break;
3996  }
3998  {
3999  Snapshot snap;
4000 
4001  snap = change->data.snapshot;
4002 
4003  sz += sizeof(SnapshotData) +
4004  sizeof(TransactionId) * snap->xcnt +
4005  sizeof(TransactionId) * snap->subxcnt;
4006 
4007  break;
4008  }
4010  {
4011  sz += sizeof(Oid) * change->data.truncate.nrelids;
4012 
4013  break;
4014  }
4018  /* ReorderBufferChange contains everything important */
4019  break;
4020  }
4021 
4022  return sz;
4023 }
uint32 TransactionId
Definition: c.h:587
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
uint32 t_len
Definition: htup.h:64
HeapTupleData tuple
Definition: reorderbuffer.h:29
struct SnapshotData SnapshotData
struct ReorderBufferChange::@97::@102 inval
union ReorderBufferChange::@97 data
struct ReorderBufferChange::@97::@100 msg
size_t Size
Definition: c.h:540
struct ReorderBufferChange::@97::@98 tp
struct ReorderBufferChange::@97::@99 truncate
uint32 xcnt
Definition: snapshot.h:169
struct HeapTupleData HeapTupleData
struct ReorderBufferChange ReorderBufferChange
int32 subxcnt
Definition: snapshot.h:181

◆ ReorderBufferCheckMemoryLimit()

static void ReorderBufferCheckMemoryLimit ( ReorderBuffer rb)
static

Definition at line 3423 of file reorderbuffer.c.

References Assert, logical_decoding_work_mem, ReorderBufferTXN::nentries_mem, ReorderBufferCanStartStreaming(), ReorderBufferLargestTopTXN(), ReorderBufferLargestTXN(), ReorderBufferSerializeTXN(), ReorderBufferStreamTXN(), ReorderBufferTXN::size, ReorderBuffer::size, ReorderBufferTXN::toptxn, ReorderBufferTXN::total_size, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferQueueChange().

3424 {
3425  ReorderBufferTXN *txn;
3426 
3427  /* bail out if we haven't exceeded the memory limit */
3428  if (rb->size < logical_decoding_work_mem * 1024L)
3429  return;
3430 
3431  /*
3432  * Loop until we reach under the memory limit. One might think that just
3433  * by evicting the largest (sub)transaction we will come under the memory
3434  * limit based on assumption that the selected transaction is at least as
3435  * large as the most recent change (which caused us to go over the memory
3436  * limit). However, that is not true because a user can reduce the
3437  * logical_decoding_work_mem to a smaller value before the most recent
3438  * change.
3439  */
3440  while (rb->size >= logical_decoding_work_mem * 1024L)
3441  {
3442  /*
3443  * Pick the largest transaction (or subtransaction) and evict it from
3444  * memory by streaming, if possible. Otherwise, spill to disk.
3445  */
3447  (txn = ReorderBufferLargestTopTXN(rb)) != NULL)
3448  {
3449  /* we know there has to be one, because the size is not zero */
3450  Assert(txn && !txn->toptxn);
3451  Assert(txn->total_size > 0);
3452  Assert(rb->size >= txn->total_size);
3453 
3454  ReorderBufferStreamTXN(rb, txn);
3455  }
3456  else
3457  {
3458  /*
3459  * Pick the largest transaction (or subtransaction) and evict it
3460  * from memory by serializing it to disk.
3461  */
3462  txn = ReorderBufferLargestTXN(rb);
3463 
3464  /* we know there has to be one, because the size is not zero */
3465  Assert(txn);
3466  Assert(txn->size > 0);
3467  Assert(rb->size >= txn->size);
3468 
3469  ReorderBufferSerializeTXN(rb, txn);
3470  }
3471 
3472  /*
3473  * After eviction, the transaction should have no entries in memory,
3474  * and should use 0 bytes for changes.
3475  */
3476  Assert(txn->size == 0);
3477  Assert(txn->nentries_mem == 0);
3478  }
3479 
3480  /* We must be under the memory limit now. */
3481  Assert(rb->size < logical_decoding_work_mem * 1024L);
3482 }
static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferLargestTopTXN(ReorderBuffer *rb)
struct ReorderBufferTXN * toptxn
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:804
int logical_decoding_work_mem
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)

◆ ReorderBufferCleanupSerializedTXNs()

static void ReorderBufferCleanupSerializedTXNs ( const char *  slotname)
static

Definition at line 4367 of file reorderbuffer.c.

References AllocateDir(), dirent::d_name, ereport, errcode_for_file_access(), errmsg(), ERROR, FreeDir(), INFO, lstat, MAXPGPATH, ReadDirExtended(), S_ISDIR, snprintf, sprintf, and stat::st_mode.

Referenced by ReorderBufferAllocate(), ReorderBufferFree(), and StartupReorderBuffer().

4368 {
4369  DIR *spill_dir;
4370  struct dirent *spill_de;
4371  struct stat statbuf;
4372  char path[MAXPGPATH * 2 + 12];
4373 
4374  sprintf(path, "pg_replslot/%s", slotname);
4375 
4376  /* we're only handling directories here, skip if it's not ours */
4377  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
4378  return;
4379 
4380  spill_dir = AllocateDir(path);
4381  while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
4382  {
4383  /* only look at names that can be ours */
4384  if (strncmp(spill_de->d_name, "xid", 3) == 0)
4385  {
4386  snprintf(path, sizeof(path),
4387  "pg_replslot/%s/%s", slotname,
4388  spill_de->d_name);
4389 
4390  if (unlink(path) != 0)
4391  ereport(ERROR,
4393  errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
4394  path, slotname)));
4395  }
4396  }
4397  FreeDir(spill_dir);
4398 }
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2715
#define INFO
Definition: elog.h:33
Definition: dirent.h:9
#define sprintf
Definition: port.h:218
Definition: dirent.c:25
#define ERROR
Definition: elog.h:46
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:721
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2634
#define ereport(elevel,...)
Definition: elog.h:157
#define S_ISDIR(m)
Definition: win32_port.h:316
#define lstat(path, sb)
Definition: win32_port.h:276
int errmsg(const char *fmt,...)
Definition: elog.c:909
char d_name[MAX_PATH]
Definition: dirent.h:15
#define snprintf
Definition: port.h:216
int FreeDir(DIR *dir)
Definition: fd.c:2752

◆ ReorderBufferCleanupTXN()

static void ReorderBufferCleanupTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1437 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_node, ReorderBuffer::by_txn, ReorderBufferTXN::changes, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, HASH_REMOVE, hash_search(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, rbtxn_is_serialized, rbtxn_is_streamed, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferFreeSnap(), ReorderBufferRestoreCleanup(), ReorderBufferReturnChange(), ReorderBufferReturnTXN(), SnapBuildSnapDecRefcount(), ReorderBufferTXN::snapshot_now, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAbortOld(), ReorderBufferFinishPrepared(), ReorderBufferForget(), ReorderBufferProcessTXN(), ReorderBufferReplay(), and ReorderBufferStreamCommit().

1438 {
1439  bool found;
1440  dlist_mutable_iter iter;
1441 
1442  /* cleanup subtransactions & their changes */
1443  dlist_foreach_modify(iter, &txn->subtxns)
1444  {
1445  ReorderBufferTXN *subtxn;
1446 
1447  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1448 
1449  /*
1450  * Subtransactions are always associated to the toplevel TXN, even if
1451  * they originally were happening inside another subtxn, so we won't
1452  * ever recurse more than one level deep here.
1453  */
1454  Assert(rbtxn_is_known_subxact(subtxn));
1455  Assert(subtxn->nsubtxns == 0);
1456 
1457  ReorderBufferCleanupTXN(rb, subtxn);
1458  }
1459 
1460  /* cleanup changes in the txn */
1461  dlist_foreach_modify(iter, &txn->changes)
1462  {
1463  ReorderBufferChange *change;
1464 
1465  change = dlist_container(ReorderBufferChange, node, iter.cur);
1466 
1467  /* Check we're not mixing changes from different transactions. */
1468  Assert(change->txn == txn);
1469 
1470  ReorderBufferReturnChange(rb, change, true);
1471  }
1472 
1473  /*
1474  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1475  * They are always stored in the toplevel transaction.
1476  */
1477  dlist_foreach_modify(iter, &txn->tuplecids)
1478  {
1479  ReorderBufferChange *change;
1480 
1481  change = dlist_container(ReorderBufferChange, node, iter.cur);
1482 
1483  /* Check we're not mixing changes from different transactions. */
1484  Assert(change->txn == txn);
1486 
1487  ReorderBufferReturnChange(rb, change, true);
1488  }
1489 
1490  /*
1491  * Cleanup the base snapshot, if set.
1492  */
1493  if (txn->base_snapshot != NULL)
1494  {
1497  }
1498 
1499  /*
1500  * Cleanup the snapshot for the last streamed run.
1501  */
1502  if (txn->snapshot_now != NULL)
1503  {
1504  Assert(rbtxn_is_streamed(txn));
1506  }
1507 
1508  /*
1509  * Remove TXN from its containing list.
1510  *
1511  * Note: if txn is known as subxact, we are deleting the TXN from its
1512  * parent's list of known subxacts; this leaves the parent's nsubxacts
1513  * count too high, but we don't care. Otherwise, we are deleting the TXN
1514  * from the LSN-ordered list of toplevel TXNs.
1515  */
1516  dlist_delete(&txn->node);
1517 
1518  /* now remove reference from buffer */
1519  hash_search(rb->by_txn,
1520  (void *) &txn->xid,
1521  HASH_REMOVE,
1522  &found);
1523  Assert(found);
1524 
1525  /* remove entries spilled to disk */
1526  if (rbtxn_is_serialized(txn))
1527  ReorderBufferRestoreCleanup(rb, txn);
1528 
1529  /* deallocate */
1530  ReorderBufferReturnTXN(rb, txn);
1531 }
Snapshot base_snapshot
dlist_node base_snapshot_node
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:543
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:87
Snapshot snapshot_now
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
#define rbtxn_is_streamed(txn)
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
dlist_head changes
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
#define Assert(condition)
Definition: c.h:804
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:409
dlist_head subtxns
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
#define rbtxn_is_serialized(txn)
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
#define rbtxn_is_known_subxact(txn)
dlist_head tuplecids

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2601 of file reorderbuffer.c.

References InvalidXLogRecPtr, ReorderBufferReplay(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

2605 {
2606  ReorderBufferTXN *txn;
2607 
2608  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2609  false);
2610 
2611  /* unknown transaction, nothing to replay */
2612  if (txn == NULL)
2613  return;
2614 
2615  ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2616  origin_id, origin_lsn);
2617 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 1121 of file reorderbuffer.c.

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit(), and DecodePrepare().

1124 {
1125  ReorderBufferTXN *subtxn;
1126 
1127  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1128  InvalidXLogRecPtr, false);
1129 
1130  /*
1131  * No need to do anything if that subtxn didn't contain any changes
1132  */
1133  if (!subtxn)
1134  return;
1135 
1136  subtxn->final_lsn = commit_lsn;
1137  subtxn->end_lsn = end_lsn;
1138 
1139  /*
1140  * Assign this subxact as a child of the toplevel xact (no-op if already
1141  * done.)
1142  */
1143  ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1144 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr final_lsn
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
XLogRecPtr end_lsn
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferCopySnap()

static Snapshot ReorderBufferCopySnap ( ReorderBuffer rb,
Snapshot  orig_snap,
ReorderBufferTXN txn,
CommandId  cid 
)
static

Definition at line 1736 of file reorderbuffer.c.

References SnapshotData::active_count, ReorderBuffer::context, SnapshotData::copied, dlist_iter::cur, SnapshotData::curcid, dlist_container, dlist_foreach, i, MemoryContextAllocZero(), ReorderBufferTXN::nsubtxns, qsort, SnapshotData::regd_count, ReorderBufferTXN::subtxns, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::xcnt, ReorderBufferTXN::xid, xidComparator(), and SnapshotData::xip.

Referenced by ReorderBufferProcessTXN(), ReorderBufferSaveTXNSnapshot(), and ReorderBufferStreamTXN().

1738 {
1739  Snapshot snap;
1740  dlist_iter iter;
1741  int i = 0;
1742  Size size;
1743 
1744  size = sizeof(SnapshotData) +
1745  sizeof(TransactionId) * orig_snap->xcnt +
1746  sizeof(TransactionId) * (txn->nsubtxns + 1);
1747 
1748  snap = MemoryContextAllocZero(rb->context, size);
1749  memcpy(snap, orig_snap, sizeof(SnapshotData));
1750 
1751  snap->copied = true;
1752  snap->active_count = 1; /* mark as active so nobody frees it */
1753  snap->regd_count = 0;
1754  snap->xip = (TransactionId *) (snap + 1);
1755 
1756  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1757 
1758  /*
1759  * snap->subxip contains all txids that belong to our transaction which we
1760  * need to check via cmin/cmax. That's why we store the toplevel
1761  * transaction in there as well.
1762  */
1763  snap->subxip = snap->xip + snap->xcnt;
1764  snap->subxip[i++] = txn->xid;
1765 
1766  /*
1767  * subxcnt isn't decreased when subtransactions abort, so count manually.
1768  * Since it's an upper boundary it is safe to use it for the allocation
1769  * above.
1770  */
1771  snap->subxcnt = 1;
1772 
1773  dlist_foreach(iter, &txn->subtxns)
1774  {
1775  ReorderBufferTXN *sub_txn;
1776 
1777  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1778  snap->subxip[i++] = sub_txn->xid;
1779  snap->subxcnt++;
1780  }
1781 
1782  /* sort so we can bsearch() later */
1783  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1784 
1785  /* store the specified current CommandId */
1786  snap->curcid = cid;
1787 
1788  return snap;
1789 }
uint32 TransactionId
Definition: c.h:587
bool copied
Definition: snapshot.h:185
#define dlist_foreach(iter, lhead)
Definition: ilist.h:526
uint32 regd_count
Definition: snapshot.h:205
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
struct SnapshotData SnapshotData
TransactionId * xip
Definition: snapshot.h:168
MemoryContext context
TransactionId xid
dlist_node * cur
Definition: ilist.h:161
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:906
CommandId curcid
Definition: snapshot.h:187
size_t Size
Definition: c.h:540
dlist_head subtxns
uint32 xcnt
Definition: snapshot.h:169
int i
#define qsort(a, b, c, d)
Definition: port.h:504
TransactionId * subxip
Definition: snapshot.h:180
uint32 active_count
Definition: snapshot.h:204
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:136
int32 subxcnt
Definition: snapshot.h:181

◆ ReorderBufferExecuteInvalidations()

static void ReorderBufferExecuteInvalidations ( uint32  nmsgs,
SharedInvalidationMessage msgs 
)
static

Definition at line 3227 of file reorderbuffer.c.

References i, and LocalExecuteInvalidationMessage().

Referenced by ReorderBufferFinishPrepared(), and ReorderBufferProcessTXN().

3228 {
3229  int i;
3230 
3231  for (i = 0; i < nmsgs; i++)
3233 }
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:562
int i

◆ ReorderBufferFinishPrepared()

void ReorderBufferFinishPrepared ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
XLogRecPtr  initial_consistent_point,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn,
char *  gid,
bool  is_commit 
)

Definition at line 2707 of file reorderbuffer.c.

References Assert, ReorderBuffer::commit_prepared, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, pstrdup(), RBTXN_PREPARE, ReorderBufferCleanupTXN(), ReorderBufferExecuteInvalidations(), ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBuffer::rollback_prepared, ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::txn_flags.

Referenced by DecodeAbort(), and DecodeCommit().

2712 {
2713  ReorderBufferTXN *txn;
2714  XLogRecPtr prepare_end_lsn;
2715  TimestampTz prepare_time;
2716 
2717  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
2718 
2719  /* unknown transaction, nothing to do */
2720  if (txn == NULL)
2721  return;
2722 
2723  /*
2724  * By this time the txn has the prepare record information, remember it to
2725  * be later used for rollback.
2726  */
2727  prepare_end_lsn = txn->end_lsn;
2728  prepare_time = txn->commit_time;
2729 
2730  /* add the gid in the txn */
2731  txn->gid = pstrdup(gid);
2732 
2733  /*
2734  * It is possible that this transaction is not decoded at prepare time
2735  * either because by that time we didn't have a consistent snapshot or it
2736  * was decoded earlier but we have restarted. We only need to send the
2737  * prepare if it was not decoded earlier. We don't need to decode the xact
2738  * for aborts if it is not done already.
2739  */
2740  if ((txn->final_lsn < initial_consistent_point) && is_commit)
2741  {
2742  txn->txn_flags |= RBTXN_PREPARE;
2743 
2744  /*
2745  * The prepare info must have been updated in txn even if we skip
2746  * prepare.
2747  */
2749 
2750  /*
2751  * By this time the txn has the prepare record information and it is
2752  * important to use that so that downstream gets the accurate
2753  * information. If instead, we have passed commit information here
2754  * then downstream can behave as it has already replayed commit
2755  * prepared after the restart.
2756  */
2757  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2758  txn->commit_time, txn->origin_id, txn->origin_lsn);
2759  }
2760 
2761  txn->final_lsn = commit_lsn;
2762  txn->end_lsn = end_lsn;
2763  txn->commit_time = commit_time;
2764  txn->origin_id = origin_id;
2765  txn->origin_lsn = origin_lsn;
2766 
2767  if (is_commit)
2768  rb->commit_prepared(rb, txn, commit_lsn);
2769  else
2770  rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
2771 
2772  /* cleanup: make sure there's no cache pollution */
2774  txn->invalidations);
2775  ReorderBufferCleanupTXN(rb, txn);
2776 }
TimestampTz commit_time
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
RepOriginId origin_id
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1299
XLogRecPtr origin_lsn
XLogRecPtr final_lsn
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn
ReorderBufferRollbackPreparedCB rollback_prepared
SharedInvalidationMessage * invalidations
#define RBTXN_PREPARE
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
ReorderBufferCommitPreparedCB commit_prepared

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2876 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), ReorderBuffer::stream_abort, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

2877 {
2878  ReorderBufferTXN *txn;
2879 
2880  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2881  false);
2882 
2883  /* unknown, nothing to forget */
2884  if (txn == NULL)
2885  return;
2886 
2887  /* For streamed transactions notify the remote node about the abort. */
2888  if (rbtxn_is_streamed(txn))
2889  rb->stream_abort(rb, txn, lsn);
2890 
2891  /* cosmetic... */
2892  txn->final_lsn = lsn;
2893 
2894  /*
2895  * Process cache invalidation messages if there are any. Even if we're not
2896  * interested in the transaction's contents, it could have manipulated the
2897  * catalog and we need to update the caches according to that.
2898  */
2899  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2901  txn->invalidations);
2902  else
2903  Assert(txn->ninvalidations == 0);
2904 
2905  /* remove potential on-disk data, and deallocate */
2906  ReorderBufferCleanupTXN(rb, txn);
2907 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
ReorderBufferStreamAbortCB stream_abort
#define rbtxn_is_streamed(txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:804
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer rb)

Definition at line 375 of file reorderbuffer.c.

References ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

376 {
377  MemoryContext context = rb->context;
378 
379  /*
380  * We free separately allocated data by entirely scrapping reorderbuffer's
381  * memory context.
382  */
383  MemoryContextDelete(context);
384 
385  /* Free disk space used by unconsumed reorder buffers */
387 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:218
ReplicationSlotPersistentData data
Definition: slot.h:156
MemoryContext context
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NameStr(name)
Definition: c.h:681
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)

◆ ReorderBufferFreeSnap()

static void ReorderBufferFreeSnap ( ReorderBuffer rb,
Snapshot  snap 
)
static

Definition at line 1795 of file reorderbuffer.c.

References SnapshotData::copied, pfree(), and SnapBuildSnapDecRefcount().

Referenced by ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), ReorderBufferReturnChange(), and ReorderBufferStreamTXN().

1796 {
1797  if (snap->copied)
1798  pfree(snap);
1799  else
1801 }
bool copied
Definition: snapshot.h:185
void pfree(void *pointer)
Definition: mcxt.c:1169
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:409

◆ ReorderBufferGetChange()

ReorderBufferChange* ReorderBufferGetChange ( ReorderBuffer rb)

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer rb)

Definition at line 946 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessRunningXacts().

947 {
948  ReorderBufferTXN *txn;
949 
950  AssertTXNLsnOrder(rb);
951 
953  return NULL;
954 
956 
959  return txn;
960 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:506
dlist_head toplevel_by_lsn
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:804
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define rbtxn_is_known_subxact(txn)

◆ ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBuffer rb)

Definition at line 974 of file reorderbuffer.c.

References AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, dlist_head_element, dlist_is_empty(), InvalidTransactionId, ReorderBufferTXNByIdEnt::txn, ReorderBuffer::txns_by_base_snapshot_lsn, and SnapshotData::xmin.

Referenced by SnapBuildProcessRunningXacts().

975 {
976  ReorderBufferTXN *txn;
977 
978  AssertTXNLsnOrder(rb);
979 
981  return InvalidTransactionId;
982 
983  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
985  return txn->base_snapshot->xmin;
986 }
Snapshot base_snapshot
dlist_head txns_by_base_snapshot_lsn
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: snapshot.h:157
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:506
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289

◆ ReorderBufferGetRelids()

Oid* ReorderBufferGetRelids ( ReorderBuffer rb,
int  nrelids 
)

Definition at line 572 of file reorderbuffer.c.

References ReorderBuffer::context, and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

573 {
574  Oid *relids;
575  Size alloc_len;
576 
577  alloc_len = sizeof(Oid) * nrelids;
578 
579  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
580 
581  return relids;
582 }
unsigned int Oid
Definition: postgres_ext.h:31
MemoryContext context
size_t Size
Definition: c.h:540
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863

◆ ReorderBufferGetTupleBuf()

ReorderBufferTupleBuf* ReorderBufferGetTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)

Definition at line 536 of file reorderbuffer.c.

References ReorderBufferTupleBuf::alloc_tuple_size, MemoryContextAlloc(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, HeapTupleData::t_data, ReorderBuffer::tup_context, and ReorderBufferTupleBuf::tuple.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

537 {
538  ReorderBufferTupleBuf *tuple;
539  Size alloc_len;
540 
541  alloc_len = tuple_len + SizeofHeapTupleHeader;
542 
543  tuple = (ReorderBufferTupleBuf *)
545  sizeof(ReorderBufferTupleBuf) +
546  MAXIMUM_ALIGNOF + alloc_len);
547  tuple->alloc_tuple_size = alloc_len;
548  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
549 
550  return tuple;
551 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
HeapTupleHeader t_data
Definition: htup.h:68
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:38
HeapTupleData tuple
Definition: reorderbuffer.h:29
size_t Size
Definition: c.h:540
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863
MemoryContext tup_context

◆ ReorderBufferGetTXN()

static ReorderBufferTXN * ReorderBufferGetTXN ( ReorderBuffer rb)
static

Definition at line 393 of file reorderbuffer.c.

References ReorderBufferTXN::changes, ReorderBufferTXN::command_id, dlist_init(), InvalidCommandId, MemoryContextAlloc(), ReorderBufferTXN::output_plugin_private, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferTXNByIdEnt::txn, and ReorderBuffer::txn_context.

Referenced by ReorderBufferTXNByXid().

394 {
395  ReorderBufferTXN *txn;
396 
397  txn = (ReorderBufferTXN *)
399 
400  memset(txn, 0, sizeof(ReorderBufferTXN));
401 
402  dlist_init(&txn->changes);
403  dlist_init(&txn->tuplecids);
404  dlist_init(&txn->subtxns);
405 
406  /* InvalidCommandId is not zero, so set it explicitly */
408  txn->output_plugin_private = NULL;
409 
410  return txn;
411 }
CommandId command_id
dlist_head changes
#define InvalidCommandId
Definition: c.h:604
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
dlist_head subtxns
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863
void * output_plugin_private
MemoryContext txn_context
dlist_head tuplecids

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 2949 of file reorderbuffer.c.

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by DecodeXactOp(), ReorderBufferAbort(), ReorderBufferForget(), and ReorderBufferInvalidate().

2951 {
2952  bool use_subtxn = IsTransactionOrTransactionBlock();
2953  int i;
2954 
2955  if (use_subtxn)
2956  BeginInternalSubTransaction("replay");
2957 
2958  /*
2959  * Force invalidations to happen outside of a valid transaction - that way
2960  * entries will just be marked as invalid without accessing the catalog.
2961  * That's advantageous because we don't need to setup the full state
2962  * necessary for catalog access.
2963  */
2964  if (use_subtxn)
2966 
2967  for (i = 0; i < ninvalidations; i++)
2968  LocalExecuteInvalidationMessage(&invalidations[i]);
2969 
2970  if (use_subtxn)
2972 }
void AbortCurrentTransaction(void)
Definition: xact.c:3210
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4701
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4512
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4407
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:562
int i

◆ ReorderBufferInvalidate()

void ReorderBufferInvalidate ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2918 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodePrepare().

2919 {
2920  ReorderBufferTXN *txn;
2921 
2922  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2923  false);
2924 
2925  /* unknown, nothing to do */
2926  if (txn == NULL)
2927  return;
2928 
2929  /*
2930  * Process cache invalidation messages if there are any. Even if we're not
2931  * interested in the transaction's contents, it could have manipulated the
2932  * catalog and we need to update the caches according to that.
2933  */
2934  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2936  txn->invalidations);
2937  else
2938  Assert(txn->ninvalidations == 0);
2939 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
#define Assert(condition)
Definition: c.h:804
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferIterCompare()

static int ReorderBufferIterCompare ( Datum  a,
Datum  b,
void *  arg 
)
static

Definition at line 1163 of file reorderbuffer.c.

References DatumGetInt32, ReorderBufferIterTXNState::entries, and ReorderBufferIterTXNEntry::lsn.

Referenced by ReorderBufferIterTXNInit().

1164 {
1166  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
1167  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
1168 
1169  if (pos_a < pos_b)
1170  return 1;
1171  else if (pos_a == pos_b)
1172  return 0;
1173  return -1;
1174 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
#define DatumGetInt32(X)
Definition: postgres.h:516
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:317
void * arg

◆ ReorderBufferIterTXNFinish()

static void ReorderBufferIterTXNFinish ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1406 of file reorderbuffer.c.

References Assert, binaryheap_free(), dlist_container, dlist_is_empty(), dlist_pop_head_node(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::file, FileClose(), ReorderBufferIterTXNState::heap, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, pfree(), ReorderBufferReturnChange(), and TXNEntryFile::vfd.

Referenced by ReorderBufferProcessTXN().

1408 {
1409  int32 off;
1410 
1411  for (off = 0; off < state->nr_txns; off++)
1412  {
1413  if (state->entries[off].file.vfd != -1)
1414  FileClose(state->entries[off].file.vfd);
1415  }
1416 
1417  /* free memory we might have "leaked" in the last *Next call */
1418  if (!dlist_is_empty(&state->old_change))
1419  {
1420  ReorderBufferChange *change;
1421 
1422  change = dlist_container(ReorderBufferChange, node,
1423  dlist_pop_head_node(&state->old_change));
1424  ReorderBufferReturnChange(rb, change, true);
1425  Assert(dlist_is_empty(&state->old_change));
1426  }
1427 
1428  binaryheap_free(state->heap);
1429  pfree(state);
1430 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
signed int int32
Definition: c.h:429
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
void pfree(void *pointer)
Definition: mcxt.c:1169
void FileClose(File file)
Definition: fd.c:1873
#define Assert(condition)
Definition: c.h:804
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:69
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)

◆ ReorderBufferIterTXNInit()

static void ReorderBufferIterTXNInit ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferIterTXNState *volatile *  iter_state 
)
static

Definition at line 1186 of file reorderbuffer.c.

References AssertChangeLsnOrder(), binaryheap_add_unordered(), binaryheap_allocate(), binaryheap_build(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, ReorderBuffer::context, dlist_iter::cur, dlist_container, dlist_foreach, dlist_head_element, dlist_init(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::file, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, MemoryContextAllocZero(), ReorderBufferTXN::nentries, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, rbtxn_is_serialized, ReorderBufferIterCompare(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferIterTXNEntry::segno, ReorderBufferTXN::subtxns, ReorderBufferTXNByIdEnt::txn, ReorderBufferIterTXNEntry::txn, and TXNEntryFile::vfd.

Referenced by ReorderBufferProcessTXN().

1188 {
1189  Size nr_txns = 0;
1191  dlist_iter cur_txn_i;
1192  int32 off;
1193 
1194  *iter_state = NULL;
1195 
1196  /* Check ordering of changes in the toplevel transaction. */
1197  AssertChangeLsnOrder(txn);
1198 
1199  /*
1200  * Calculate the size of our heap: one element for every transaction that
1201  * contains changes. (Besides the transactions already in the reorder
1202  * buffer, we count the one we were directly passed.)
1203  */
1204  if (txn->nentries > 0)
1205  nr_txns++;
1206 
1207  dlist_foreach(cur_txn_i, &txn->subtxns)
1208  {
1209  ReorderBufferTXN *cur_txn;
1210 
1211  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1212 
1213  /* Check ordering of changes in this subtransaction. */
1214  AssertChangeLsnOrder(cur_txn);
1215 
1216  if (cur_txn->nentries > 0)
1217  nr_txns++;
1218  }
1219 
1220  /* allocate iteration state */
1221  state = (ReorderBufferIterTXNState *)
1223  sizeof(ReorderBufferIterTXNState) +
1224  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1225 
1226  state->nr_txns = nr_txns;
1227  dlist_init(&state->old_change);
1228 
1229  for (off = 0; off < state->nr_txns; off++)
1230  {
1231  state->entries[off].file.vfd = -1;
1232  state->entries[off].segno = 0;
1233  }
1234 
1235  /* allocate heap */
1236  state->heap = binaryheap_allocate(state->nr_txns,
1238  state);
1239 
1240  /* Now that the state fields are initialized, it is safe to return it. */
1241  *iter_state = state;
1242 
1243  /*
1244  * Now insert items into the binary heap, in an unordered fashion. (We
1245  * will run a heap assembly step at the end; this is more efficient.)
1246  */
1247 
1248  off = 0;
1249 
1250  /* add toplevel transaction if it contains changes */
1251  if (txn->nentries > 0)
1252  {
1253  ReorderBufferChange *cur_change;
1254 
1255  if (rbtxn_is_serialized(txn))
1256  {
1257  /* serialize remaining changes */
1258  ReorderBufferSerializeTXN(rb, txn);
1259  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1260  &state->entries[off].segno);
1261  }
1262 
1263  cur_change = dlist_head_element(ReorderBufferChange, node,
1264  &txn->changes);
1265 
1266  state->entries[off].lsn = cur_change->lsn;
1267  state->entries[off].change = cur_change;
1268  state->entries[off].txn = txn;
1269 
1271  }
1272 
1273  /* add subtransactions if they contain changes */
1274  dlist_foreach(cur_txn_i, &txn->subtxns)
1275  {
1276  ReorderBufferTXN *cur_txn;
1277 
1278  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1279 
1280  if (cur_txn->nentries > 0)
1281  {
1282  ReorderBufferChange *cur_change;
1283 
1284  if (rbtxn_is_serialized(cur_txn))
1285  {
1286  /* serialize remaining changes */
1287  ReorderBufferSerializeTXN(rb, cur_txn);
1288  ReorderBufferRestoreChanges(rb, cur_txn,
1289  &state->entries[off].file,
1290  &state->entries[off].segno);
1291  }
1292  cur_change = dlist_head_element(ReorderBufferChange, node,
1293  &cur_txn->changes);
1294 
1295  state->entries[off].lsn = cur_change->lsn;
1296  state->entries[off].change = cur_change;
1297  state->entries[off].txn = cur_txn;
1298 
1300  }
1301  }
1302 
1303  /* assemble a valid binary heap */
1304  binaryheap_build(state->heap);
1305 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
ReorderBufferTXN * txn
#define dlist_foreach(iter, lhead)
Definition: ilist.h:526
void binaryheap_add_unordered(binaryheap *heap, Datum d)
Definition: binaryheap.c:110
signed int int32
Definition: c.h:429
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
dlist_head changes
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:506
ReorderBufferChange * change
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
MemoryContext context
dlist_node * cur
Definition: ilist.h:161
static void AssertChangeLsnOrder(ReorderBufferTXN *txn)
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:906
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:126
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
Definition: regguts.h:317
size_t Size
Definition: c.h:540
dlist_head subtxns
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:33
#define Int32GetDatum(X)
Definition: postgres.h:523
#define rbtxn_is_serialized(txn)

◆ ReorderBufferIterTXNNext()

static ReorderBufferChange * ReorderBufferIterTXNNext ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1314 of file reorderbuffer.c.

References Assert, binaryheap::bh_size, binaryheap_first(), binaryheap_remove_first(), binaryheap_replace_first(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, DatumGetInt32, DEBUG2, dlist_container, dlist_delete(), dlist_has_next(), dlist_head_element, dlist_is_empty(), dlist_next_node(), dlist_pop_head_node(), dlist_push_tail(), elog, ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::file, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, next, ReorderBufferChange::node, ReorderBufferIterTXNState::old_change, ReorderBufferRestoreChanges(), ReorderBufferReturnChange(), ReorderBufferIterTXNEntry::segno, ReorderBufferTXN::size, ReorderBuffer::totalBytes, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferProcessTXN().

1315 {
1316  ReorderBufferChange *change;
1318  int32 off;
1319 
1320  /* nothing there anymore */
1321  if (state->heap->bh_size == 0)
1322  return NULL;
1323 
1324  off = DatumGetInt32(binaryheap_first(state->heap));
1325  entry = &state->entries[off];
1326 
1327  /* free memory we might have "leaked" in the previous *Next call */
1328  if (!dlist_is_empty(&state->old_change))
1329  {
1330  change = dlist_container(ReorderBufferChange, node,
1331  dlist_pop_head_node(&state->old_change));
1332  ReorderBufferReturnChange(rb, change, true);
1333  Assert(dlist_is_empty(&state->old_change));
1334  }
1335 
1336  change = entry->change;
1337 
1338  /*
1339  * update heap with information about which transaction has the next
1340  * relevant change in LSN order
1341  */
1342 
1343  /* there are in-memory changes */
1344  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1345  {
1346  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1347  ReorderBufferChange *next_change =
1348  dlist_container(ReorderBufferChange, node, next);
1349 
1350  /* txn stays the same */
1351  state->entries[off].lsn = next_change->lsn;
1352  state->entries[off].change = next_change;
1353 
1355  return change;
1356  }
1357 
1358  /* try to load changes from disk */
1359  if (entry->txn->nentries != entry->txn->nentries_mem)
1360  {
1361  /*
1362  * Ugly: restoring changes will reuse *Change records, thus delete the
1363  * current one from the per-tx list and only free in the next call.
1364  */
1365  dlist_delete(&change->node);
1366  dlist_push_tail(&state->old_change, &change->node);
1367 
1368  /*
1369  * Update the total bytes processed by the txn for which we are
1370  * releasing the current set of changes and restoring the new set of
1371  * changes.
1372  */
1373  rb->totalBytes += entry->txn->size;
1374  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1375  &state->entries[off].segno))
1376  {
1377  /* successfully restored changes from disk */
1378  ReorderBufferChange *next_change =
1380  &entry->txn->changes);
1381 
1382  elog(DEBUG2, "restored %u/%u changes from disk",
1383  (uint32) entry->txn->nentries_mem,
1384  (uint32) entry->txn->nentries);
1385 
1386  Assert(entry->txn->nentries_mem);
1387  /* txn stays the same */
1388  state->entries[off].lsn = next_change->lsn;
1389  state->entries[off].change = next_change;
1391 
1392  return change;
1393  }
1394  }
1395 
1396  /* ok, no changes there anymore, remove */
1397  binaryheap_remove_first(state->heap);
1398 
1399  return change;
1400 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
static int32 next
Definition: blutils.c:219
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
#define DatumGetInt32(X)
Definition: postgres.h:516
ReorderBufferTXN * txn
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void binaryheap_replace_first(binaryheap *heap, Datum d)
Definition: binaryheap.c:204
signed int int32
Definition: c.h:429
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:440
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
dlist_head changes
Datum binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:159
#define DEBUG2
Definition: elog.h:24
unsigned int uint32
Definition: c.h:441
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
int bh_size
Definition: binaryheap.h:32
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:506
ReorderBufferChange * change
static bool dlist_has_next(dlist_head *head, dlist_node *node)
Definition: ilist.h:421
#define Assert(condition)
Definition: c.h:804
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define Int32GetDatum(X)
Definition: postgres.h:523
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
#define elog(elevel,...)
Definition: elog.h:232
Datum binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:174
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)

◆ ReorderBufferLargestTopTXN()

static ReorderBufferTXN* ReorderBufferLargestTopTXN ( ReorderBuffer rb)
static

Definition at line 3383 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, dlist_iter::cur, dlist_container, dlist_foreach, rbtxn_has_incomplete_tuple, rbtxn_is_known_subxact, ReorderBufferTXN::total_size, ReorderBufferTXNByIdEnt::txn, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferCheckMemoryLimit().

3384 {
3385  dlist_iter iter;
3386  Size largest_size = 0;
3387  ReorderBufferTXN *largest = NULL;
3388 
3389  /* Find the largest top-level transaction having a base snapshot. */
3391  {
3392  ReorderBufferTXN *txn;
3393 
3394  txn = dlist_container(ReorderBufferTXN, base_snapshot_node, iter.cur);
3395 
3396  /* must not be a subtxn */
3398  /* base_snapshot must be set */
3399  Assert(txn->base_snapshot != NULL);
3400 
3401  if ((largest == NULL || txn->total_size > largest_size) &&
3402  (txn->total_size > 0) && !(rbtxn_has_incomplete_tuple(txn)))
3403  {
3404  largest = txn;
3405  largest_size = txn->total_size;
3406  }
3407  }
3408 
3409  return largest;
3410 }
Snapshot base_snapshot
#define dlist_foreach(iter, lhead)
Definition: ilist.h:526
#define rbtxn_has_incomplete_tuple(txn)
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
dlist_head txns_by_base_snapshot_lsn
dlist_node * cur
Definition: ilist.h:161
#define Assert(condition)
Definition: c.h:804
size_t Size
Definition: c.h:540
#define rbtxn_is_known_subxact(txn)

◆ ReorderBufferLargestTXN()

static ReorderBufferTXN* ReorderBufferLargestTXN ( ReorderBuffer rb)
static

Definition at line 3336 of file reorderbuffer.c.

References Assert, ReorderBuffer::by_txn, hash_seq_init(), hash_seq_search(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferCheckMemoryLimit().

3337 {
3338  HASH_SEQ_STATUS hash_seq;
3340  ReorderBufferTXN *largest = NULL;
3341 
3342  hash_seq_init(&hash_seq, rb->by_txn);
3343  while ((ent = hash_seq_search(&hash_seq)) != NULL)
3344  {
3345  ReorderBufferTXN *txn = ent->txn;
3346 
3347  /* if the current transaction is larger, remember it */
3348  if ((!largest) || (txn->size > largest->size))
3349  largest = txn;
3350  }
3351 
3352  Assert(largest);
3353  Assert(largest->size > 0);
3354  Assert(largest->size <= rb->size);
3355 
3356  return largest;
3357 }
#define Assert(condition)
Definition: c.h:804
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
ReorderBufferTXN * txn

◆ ReorderBufferPrepare()

void ReorderBufferPrepare ( ReorderBuffer rb,
TransactionId  xid,
char *  gid 
)

Definition at line 2670 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::commit_time, ReorderBufferTXN::concurrent_abort, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBuffer::prepare, pstrdup(), rbtxn_is_streamed, RBTXN_PREPARE, ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::txn_flags.

Referenced by DecodePrepare().

2672 {
2673  ReorderBufferTXN *txn;
2674 
2675  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2676  false);
2677 
2678  /* unknown transaction, nothing to replay */
2679  if (txn == NULL)
2680  return;
2681 
2682  txn->txn_flags |= RBTXN_PREPARE;
2683  txn->gid = pstrdup(gid);
2684 
2685  /* The prepare info must have been updated in txn by now. */
2687 
2688  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2689  txn->commit_time, txn->origin_id, txn->origin_lsn);
2690 
2691  /*
2692  * We send the prepare for the concurrently aborted xacts so that later
2693  * when rollback prepared is decoded and sent, the downstream should be
2694  * able to rollback such a xact. See comments atop DecodePrepare.
2695  *
2696  * Note, for the concurrent_abort + streaming case a stream_prepare was
2697  * already sent within the ReorderBufferReplay call above.
2698  */
2699  if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
2700  rb->prepare(rb, txn, txn->final_lsn);
2701 }
TimestampTz commit_time
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
RepOriginId origin_id
char * pstrdup(const char *in)
Definition: mcxt.c:1299
XLogRecPtr origin_lsn
#define rbtxn_is_streamed(txn)
XLogRecPtr final_lsn
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn
#define RBTXN_PREPARE
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
ReorderBufferPrepareCB prepare

◆ ReorderBufferProcessPartialChange()

static void ReorderBufferProcessPartialChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  toast_insert 
)
static

Definition at line 688 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, IsInsertOrUpdate, IsSpecConfirm, IsSpecInsert, rbtxn_has_incomplete_tuple, RBTXN_HAS_SPEC_INSERT, rbtxn_has_spec_insert, RBTXN_HAS_TOAST_INSERT, rbtxn_has_toast_insert, rbtxn_is_serialized, ReorderBufferCanStartStreaming(), ReorderBufferCanStream(), ReorderBufferStreamTXN(), ReorderBufferTXN::toptxn, ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::txn_flags.

Referenced by ReorderBufferQueueChange().

691 {
692  ReorderBufferTXN *toptxn;
693 
694  /*
695  * The partial changes need to be processed only while streaming
696  * in-progress transactions.
697  */
698  if (!ReorderBufferCanStream(rb))
699  return;
700 
701  /* Get the top transaction. */
702  if (txn->toptxn != NULL)
703  toptxn = txn->toptxn;
704  else
705  toptxn = txn;
706 
707  /*
708  * Set the toast insert bit whenever we get toast insert to indicate a
709  * partial change and clear it when we get the insert or update on main
710  * table (Both update and insert will do the insert in the toast table).
711  */
712  if (toast_insert)
714  else if (rbtxn_has_toast_insert(toptxn) &&
715  IsInsertOrUpdate(change->action))
716  toptxn->txn_flags &= ~RBTXN_HAS_TOAST_INSERT;
717 
718  /*
719  * Set the spec insert bit whenever we get the speculative insert to
720  * indicate the partial change and clear the same on speculative confirm.
721  */
722  if (IsSpecInsert(change->action))
723  toptxn->txn_flags |= RBTXN_HAS_SPEC_INSERT;
724  else if (IsSpecConfirm(change->action))
725  {
726  /*
727  * Speculative confirm change must be preceded by speculative
728  * insertion.
729  */
730  Assert(rbtxn_has_spec_insert(toptxn));
731  toptxn->txn_flags &= ~RBTXN_HAS_SPEC_INSERT;
732  }
733 
734  /*
735  * Stream the transaction if it is serialized before and the changes are
736  * now complete in the top-level transaction.
737  *
738  * The reason for doing the streaming of such a transaction as soon as we
739  * get the complete change for it is that previously it would have reached
740  * the memory threshold and wouldn't get streamed because of incomplete
741  * changes. Delaying such transactions would increase apply lag for them.
742  */
744  !(rbtxn_has_incomplete_tuple(toptxn)) &&
745  rbtxn_is_serialized(txn))
746  ReorderBufferStreamTXN(rb, toptxn);
747 }
#define RBTXN_HAS_TOAST_INSERT
#define rbtxn_has_incomplete_tuple(txn)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define rbtxn_has_toast_insert(txn)
#define IsSpecInsert(action)
static bool ReorderBufferCanStream(ReorderBuffer *rb)
struct ReorderBufferTXN * toptxn
#define rbtxn_has_spec_insert(txn)
#define Assert(condition)
Definition: c.h:804
#define IsSpecConfirm(action)
#define IsInsertOrUpdate(action)
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)
#define RBTXN_HAS_SPEC_INSERT
#define rbtxn_is_serialized(txn)

◆ ReorderBufferProcessTXN()

static void ReorderBufferProcessTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn,
volatile Snapshot  snapshot_now,
volatile CommandId  command_id,
bool  streaming 
)
static

Definition at line 2009 of file reorderbuffer.c.

References AbortCurrentTransaction(), ReorderBufferChange::action, Assert, ReorderBuffer::begin, ReorderBuffer::begin_prepare, BeginInternalSubTransaction(), CheckXidAlive, ReorderBufferChange::command_id, ReorderBuffer::commit, ReorderBufferTXN::concurrent_abort, SnapshotData::copied, CopyErrorData(), SnapshotData::curcid, CurrentMemoryContext, ReorderBufferChange::data, dlist_delete(), elog, ERROR, FlushErrorState(), FreeErrorData(), GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), i, ReorderBufferChange::inval, ReorderBufferTXN::invalidations, InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, ReorderBufferChange::node, ReorderBufferChange::origin_id, ReorderBufferTXN::origin_id, ReorderBuffer::output_rewrites, palloc0(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, ReorderBuffer::prepare, rbtxn_is_streamed, rbtxn_prepared, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelationIsValid, RelidByRelfilenode(), relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferApplyChange(), ReorderBufferApplyMessage(), ReorderBufferApplyTruncate(), ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferResetTXN(), ReorderBufferReturnChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), RollbackAndReleaseCurrentSubTransaction(), SetupCheckXidLive(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, ErrorData::sqlerrcode, StartTransactionCommand(), ReorderBuffer::stream_start, ReorderBuffer::stream_stop, TeardownHistoricSnapshot(), ReorderBufferTXN::total_size, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTXN::tuplecid_hash, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferReplay(), and ReorderBufferStreamTXN().

2014 {
2015  bool using_subtxn;
2017  ReorderBufferIterTXNState *volatile iterstate = NULL;
2018  volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
2019  ReorderBufferChange *volatile specinsert = NULL;
2020  volatile bool stream_started = false;
2021  ReorderBufferTXN *volatile curtxn = NULL;
2022 
2023  /* build data to be able to lookup the CommandIds of catalog tuples */
2025 
2026  /* setup the initial snapshot */
2027  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2028 
2029  /*
2030  * Decoding needs access to syscaches et al., which in turn use
2031  * heavyweight locks and such. Thus we need to have enough state around to
2032  * keep track of those. The easiest way is to simply use a transaction
2033  * internally. That also allows us to easily enforce that nothing writes
2034  * to the database by checking for xid assignments.
2035  *
2036  * When we're called via the SQL SRF there's already a transaction
2037  * started, so start an explicit subtransaction there.
2038  */
2039  using_subtxn = IsTransactionOrTransactionBlock();
2040 
2041  PG_TRY();
2042  {
2043  ReorderBufferChange *change;
2044 
2045  if (using_subtxn)
2046  BeginInternalSubTransaction(streaming ? "stream" : "replay");
2047  else
2049 
2050  /*
2051  * We only need to send begin/begin-prepare for non-streamed
2052  * transactions.
2053  */
2054  if (!streaming)
2055  {
2056  if (rbtxn_prepared(txn))
2057  rb->begin_prepare(rb, txn);
2058  else
2059  rb->begin(rb, txn);
2060  }
2061 
2062  ReorderBufferIterTXNInit(rb, txn, &iterstate);
2063  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
2064  {
2065  Relation relation = NULL;
2066  Oid reloid;
2067 
2068  /*
2069  * We can't call start stream callback before processing first
2070  * change.
2071  */
2072  if (prev_lsn == InvalidXLogRecPtr)
2073  {
2074  if (streaming)
2075  {
2076  txn->origin_id = change->origin_id;
2077  rb->stream_start(rb, txn, change->lsn);
2078  stream_started = true;
2079  }
2080  }
2081 
2082  /*
2083  * Enforce correct ordering of changes, merged from multiple
2084  * subtransactions. The changes may have the same LSN due to
2085  * MULTI_INSERT xlog records.
2086  */
2087  Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
2088 
2089  prev_lsn = change->lsn;
2090 
2091  /*
2092  * Set the current xid to detect concurrent aborts. This is
2093  * required for the cases when we decode the changes before the
2094  * COMMIT record is processed.
2095  */
2096  if (streaming || rbtxn_prepared(change->txn))
2097  {
2098  curtxn = change->txn;
2099  SetupCheckXidLive(curtxn->xid);
2100  }
2101 
2102  switch (change->action)
2103  {
2105 
2106  /*
2107  * Confirmation for speculative insertion arrived. Simply
2108  * use as a normal record. It'll be cleaned up at the end
2109  * of INSERT processing.
2110  */
2111  if (specinsert == NULL)
2112  elog(ERROR, "invalid ordering of speculative insertion changes");
2113  Assert(specinsert->data.tp.oldtuple == NULL);
2114  change = specinsert;
2116 
2117  /* intentionally fall through */
2121  Assert(snapshot_now);
2122 
2123  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
2124  change->data.tp.relnode.relNode);
2125 
2126  /*
2127  * Mapped catalog tuple without data, emitted while
2128  * catalog table was in the process of being rewritten. We
2129  * can fail to look up the relfilenode, because the
2130  * relmapper has no "historic" view, in contrast to the
2131  * normal catalog during decoding. Thus repeated rewrites
2132  * can cause a lookup failure. That's OK because we do not
2133  * decode catalog changes anyway. Normally such tuples
2134  * would be skipped over below, but we can't identify
2135  * whether the table should be logically logged without
2136  * mapping the relfilenode to the oid.
2137  */
2138  if (reloid == InvalidOid &&
2139  change->data.tp.newtuple == NULL &&
2140  change->data.tp.oldtuple == NULL)
2141  goto change_done;
2142  else if (reloid == InvalidOid)
2143  elog(ERROR, "could not map filenode \"%s\" to relation OID",
2144  relpathperm(change->data.tp.relnode,
2145  MAIN_FORKNUM));
2146 
2147  relation = RelationIdGetRelation(reloid);
2148 
2149  if (!RelationIsValid(relation))
2150  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
2151  reloid,
2152  relpathperm(change->data.tp.relnode,
2153  MAIN_FORKNUM));
2154 
2155  if (!RelationIsLogicallyLogged(relation))
2156  goto change_done;
2157 
2158  /*
2159  * Ignore temporary heaps created during DDL unless the
2160  * plugin has asked for them.
2161  */
2162  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2163  goto change_done;
2164 
2165  /*
2166  * For now ignore sequence changes entirely. Most of the
2167  * time they don't log changes using records we
2168  * understand, so it doesn't make sense to handle the few
2169  * cases we do.
2170  */
2171  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2172  goto change_done;
2173 
2174  /* user-triggered change */
2175  if (!IsToastRelation(relation))
2176  {
2177  ReorderBufferToastReplace(rb, txn, relation, change);
2178  ReorderBufferApplyChange(rb, txn, relation, change,
2179  streaming);
2180 
2181  /*
2182  * Only clear reassembled toast chunks if we're sure
2183  * they're not required anymore. The creator of the
2184  * tuple tells us.
2185  */
2186  if (change->data.tp.clear_toast_afterwards)
2187  ReorderBufferToastReset(rb, txn);
2188  }
2189  /* we're not interested in toast deletions */
2190  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2191  {
2192  /*
2193  * Need to reassemble the full toasted Datum in
2194  * memory, to ensure the chunks don't get reused till
2195  * we're done remove it from the list of this
2196  * transaction's changes. Otherwise it will get
2197  * freed/reused while restoring spooled data from
2198  * disk.
2199  */
2200  Assert(change->data.tp.newtuple != NULL);
2201 
2202  dlist_delete(&change->node);
2203  ReorderBufferToastAppendChunk(rb, txn, relation,
2204  change);
2205  }
2206 
2207  change_done:
2208 
2209  /*
2210  * Either speculative insertion was confirmed, or it was
2211  * unsuccessful and the record isn't needed anymore.
2212  */
2213  if (specinsert != NULL)
2214  {
2215  ReorderBufferReturnChange(rb, specinsert, true);
2216  specinsert = NULL;
2217  }
2218 
2219  if (RelationIsValid(relation))
2220  {
2221  RelationClose(relation);
2222  relation = NULL;
2223  }
2224  break;
2225 
2227 
2228  /*
2229  * Speculative insertions are dealt with by delaying the
2230  * processing of the insert until the confirmation record
2231  * arrives. For that we simply unlink the record from the
2232  * chain, so it does not get freed/reused while restoring
2233  * spooled data from disk.
2234  *
2235  * This is safe in the face of concurrent catalog changes
2236  * because the relevant relation can't be changed between
2237  * speculative insertion and confirmation due to
2238  * CheckTableNotInUse() and locking.
2239  */
2240 
2241  /* clear out a pending (and thus failed) speculation */
2242  if (specinsert != NULL)
2243  {
2244  ReorderBufferReturnChange(rb, specinsert, true);
2245  specinsert = NULL;
2246  }
2247 
2248  /* and memorize the pending insertion */
2249  dlist_delete(&change->node);
2250  specinsert = change;
2251  break;
2252 
2254  {
2255  int i;
2256  int nrelids = change->data.truncate.nrelids;
2257  int nrelations = 0;
2258  Relation *relations;
2259 
2260  relations = palloc0(nrelids * sizeof(Relation));
2261  for (i = 0; i < nrelids; i++)
2262  {
2263  Oid relid = change->data.truncate.relids[i];
2264  Relation relation;
2265 
2266  relation = RelationIdGetRelation(relid);
2267 
2268  if (!RelationIsValid(relation))
2269  elog(ERROR, "could not open relation with OID %u", relid);
2270 
2271  if (!RelationIsLogicallyLogged(relation))
2272  continue;
2273 
2274  relations[nrelations++] = relation;
2275  }
2276 
2277  /* Apply the truncate. */
2278  ReorderBufferApplyTruncate(rb, txn, nrelations,
2279  relations, change,
2280  streaming);
2281 
2282  for (i = 0; i < nrelations; i++)
2283  RelationClose(relations[i]);
2284 
2285  break;
2286  }
2287 
2289  ReorderBufferApplyMessage(rb, txn, change, streaming);
2290  break;
2291 
2293  /* Execute the invalidation messages locally */
2295  change->data.inval.ninvalidations,
2296  change->data.inval.invalidations);
2297  break;
2298 
2300  /* get rid of the old */
2301  TeardownHistoricSnapshot(false);
2302 
2303  if (snapshot_now->copied)
2304  {
2305  ReorderBufferFreeSnap(rb, snapshot_now);
2306  snapshot_now =
2307  ReorderBufferCopySnap(rb, change->data.snapshot,
2308  txn, command_id);
2309  }
2310 
2311  /*
2312  * Restored from disk, need to be careful not to double
2313  * free. We could introduce refcounting for that, but for
2314  * now this seems infrequent enough not to care.
2315  */
2316  else if (change->data.snapshot->copied)
2317  {
2318  snapshot_now =
2319  ReorderBufferCopySnap(rb, change->data.snapshot,
2320  txn, command_id);
2321  }
2322  else
2323  {
2324  snapshot_now = change->data.snapshot;
2325  }
2326 
2327  /* and continue with the new one */
2328  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2329  break;
2330 
2332  Assert(change->data.command_id != InvalidCommandId);
2333 
2334  if (command_id < change->data.command_id)
2335  {
2336  command_id = change->data.command_id;
2337 
2338  if (!snapshot_now->copied)
2339  {
2340  /* we don't use the global one anymore */
2341  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2342  txn, command_id);
2343  }
2344 
2345  snapshot_now->curcid = command_id;
2346 
2347  TeardownHistoricSnapshot(false);
2348  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2349  }
2350 
2351  break;
2352 
2354  elog(ERROR, "tuplecid value in changequeue");
2355  break;
2356  }
2357  }
2358 
2359  /*
2360  * There's a speculative insertion remaining, just clean in up, it
2361  * can't have been successful, otherwise we'd gotten a confirmation
2362  * record.
2363  */
2364  if (specinsert)
2365  {
2366  ReorderBufferReturnChange(rb, specinsert, true);
2367  specinsert = NULL;
2368  }
2369 
2370  /* clean up the iterator */
2371  ReorderBufferIterTXNFinish(rb, iterstate);
2372  iterstate = NULL;
2373 
2374  /*
2375  * Update total transaction count and total bytes processed by the
2376  * transaction and its subtransactions. Ensure to not count the
2377  * streamed transaction multiple times.
2378  *
2379  * Note that the statistics computation has to be done after
2380  * ReorderBufferIterTXNFinish as it releases the serialized change
2381  * which we have already accounted in ReorderBufferIterTXNNext.
2382  */
2383  if (!rbtxn_is_streamed(txn))
2384  rb->totalTxns++;
2385 
2386  rb->totalBytes += txn->total_size;
2387 
2388  /*
2389  * Done with current changes, send the last message for this set of
2390  * changes depending upon streaming mode.
2391  */
2392  if (streaming)
2393  {
2394  if (stream_started)
2395  {
2396  rb->stream_stop(rb, txn, prev_lsn);
2397  stream_started = false;
2398  }
2399  }
2400  else
2401  {
2402  /*
2403  * Call either PREPARE (for two-phase transactions) or COMMIT (for
2404  * regular ones).
2405  */
2406  if (rbtxn_prepared(txn))
2407  rb->prepare(rb, txn, commit_lsn);
2408  else
2409  rb->commit(rb, txn, commit_lsn);
2410  }
2411 
2412  /* this is just a sanity check against bad output plugin behaviour */
2414  elog(ERROR, "output plugin used XID %u",
2416 
2417  /*
2418  * Remember the command ID and snapshot for the next set of changes in
2419  * streaming mode.
2420  */
2421  if (streaming)
2422  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2423  else if (snapshot_now->copied)
2424  ReorderBufferFreeSnap(rb, snapshot_now);
2425 
2426  /* cleanup */
2427  TeardownHistoricSnapshot(false);
2428 
2429  /*
2430  * Aborting the current (sub-)transaction as a whole has the right
2431  * semantics. We want all locks acquired in here to be released, not
2432  * reassigned to the parent and we do not want any database access
2433  * have persistent effects.
2434  */
2436 
2437  /* make sure there's no cache pollution */
2439 
2440  if (using_subtxn)
2442 
2443  /*
2444  * We are here due to one of the four reasons: 1. Decoding an
2445  * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
2446  * prepared txn that was (partially) streamed. 4. Decoding a committed
2447  * txn.
2448  *
2449  * For 1, we allow truncation of txn data by removing the changes
2450  * already streamed but still keeping other things like invalidations,
2451  * snapshot, and tuplecids. For 2 and 3, we indicate
2452  * ReorderBufferTruncateTXN to do more elaborate truncation of txn
2453  * data as the entire transaction has been decoded except for commit.
2454  * For 4, as the entire txn has been decoded, we can fully clean up
2455  * the TXN reorder buffer.
2456  */
2457  if (streaming || rbtxn_prepared(txn))
2458  {
2460  /* Reset the CheckXidAlive */
2462  }
2463  else
2464  ReorderBufferCleanupTXN(rb, txn);
2465  }
2466  PG_CATCH();
2467  {
2468  MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
2469  ErrorData *errdata = CopyErrorData();
2470 
2471  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
2472  if (iterstate)
2473  ReorderBufferIterTXNFinish(rb, iterstate);
2474 
2476 
2477  /*
2478  * Force cache invalidation to happen outside of a valid transaction
2479  * to prevent catalog access as we just caught an error.
2480  */
2482 
2483  /* make sure there's no cache pollution */
2485  txn->invalidations);
2486 
2487  if (using_subtxn)
2489 
2490  /*
2491  * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2492  * abort of the (sub)transaction we are streaming or preparing. We
2493  * need to do the cleanup and return gracefully on this error, see
2494  * SetupCheckXidLive.
2495  *
2496  * This error code can be thrown by one of the callbacks we call
2497  * during decoding so we need to ensure that we return gracefully only
2498  * when we are sending the data in streaming mode and the streaming is
2499  * not finished yet or when we are sending the data out on a PREPARE
2500  * during a two-phase commit.
2501  */
2502  if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2503  (stream_started || rbtxn_prepared(txn)))
2504  {
2505  /* curtxn must be set for streaming or prepared transactions */
2506  Assert(curtxn);
2507 
2508  /* Cleanup the temporary error state. */
2509  FlushErrorState();
2510  FreeErrorData(errdata);
2511  errdata = NULL;
2512  curtxn->concurrent_abort = true;
2513 
2514  /* Reset the TXN so that it is allowed to stream remaining data. */
2515  ReorderBufferResetTXN(rb, txn, snapshot_now,
2516  command_id, prev_lsn,
2517  specinsert);
2518  }
2519  else
2520  {
2521  ReorderBufferCleanupTXN(rb, txn);
2522  MemoryContextSwitchTo(ecxt);
2523  PG_RE_THROW();
2524  }
2525  }
2526  PG_END_TRY();
2527 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
void AbortCurrentTransaction(void)
Definition: xact.c:3210
bool IsToastRelation(Relation relation)
Definition: catalog.c:145
#define relpathperm(rnode, forknum)
Definition: relpath.h:83
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define rbtxn_prepared(txn)
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
bool copied
Definition: snapshot.h:185
int sqlerrcode
Definition: elog.h:381
ErrorData * CopyErrorData(void)
Definition: elog.c:1560
static void ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:87
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4701
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2051
ReorderBufferCommitCB commit
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:655
Form_pg_class rd_rel
Definition: rel.h:109
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
void FlushErrorState(void)
Definition: elog.c:1654
#define rbtxn_is_streamed(txn)
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1616
#define ERROR
Definition: elog.h:46
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
#define RelationIsValid(relation)
Definition: rel.h:442
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:438
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4512
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:455
#define InvalidTransactionId
Definition: transam.h:31
ReorderBufferBeginCB begin_prepare
void RelationClose(Relation relation)
Definition: relcache.c:2098
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
struct ReorderBufferChange::@97::@102 inval
union ReorderBufferChange::@97 data
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
RepOriginId origin_id
Definition: reorderbuffer.h:89
TransactionId CheckXidAlive
Definition: xact.c:95
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
static void ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
void * palloc0(Size size)
Definition: mcxt.c:1093
static void ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
#define InvalidCommandId
Definition: c.h:604
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
#define InvalidOid
Definition: postgres_ext.h:36
ReorderBufferStreamStartCB stream_start
#define PG_CATCH()
Definition: elog.h:323
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
void StartTransactionCommand(void)
Definition: xact.c:2838
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4407
SharedInvalidationMessage * invalidations
#define PG_RE_THROW()
Definition: elog.h:354
struct ReorderBufferChange::@97::@98 tp
struct ReorderBufferChange::@97::@99 truncate
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2035
#define elog(elevel,...)
Definition: elog.h:232
int i
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
static void SetupCheckXidLive(TransactionId xid)
ReorderBufferBeginCB begin
#define PG_TRY()
Definition: elog.h:313
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:1992
#define PG_END_TRY()
Definition: elog.h:338
static void ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
ReorderBufferPrepareCB prepare
ReorderBufferStreamStopCB stream_stop

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2985 of file reorderbuffer.c.

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by DecodeHeap2Op(), DecodeHeapOp(), DecodeLogicalMsgOp(), DecodeStandbyOp(), DecodeXactOp(), DecodeXLogOp(), and LogicalDecodingProcessRecord().

2986 {
2987  /* many records won't have an xid assigned, centralize check here */
2988  if (xid != InvalidTransactionId)
2989  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2990 }
#define InvalidTransactionId
Definition: transam.h:31
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change,
bool  toast_insert 
)

Definition at line 754 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, ReorderBufferTXN::concurrent_abort, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBufferChangeMemoryUpdate(), ReorderBufferCheckMemoryLimit(), ReorderBufferProcessPartialChange(), ReorderBufferReturnChange(), ReorderBufferTXNByXid(), ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddInvalidations(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

756 {
757  ReorderBufferTXN *txn;
758 
759  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
760 
761  /*
762  * While streaming the previous changes we have detected that the
763  * transaction is aborted. So there is no point in collecting further
764  * changes for it.
765  */
766  if (txn->concurrent_abort)
767  {
768  /*
769  * We don't need to update memory accounting for this change as we
770  * have not added it to the queue yet.
771  */
772  ReorderBufferReturnChange(rb, change, false);
773  return;
774  }
775 
776  change->lsn = lsn;
777  change->txn = txn;
778 
779  Assert(InvalidXLogRecPtr != lsn);
780  dlist_push_tail(&txn->changes, &change->node);
781  txn->nentries++;
782  txn->nentries_mem++;
783 
784  /* update memory accounting information */
785  ReorderBufferChangeMemoryUpdate(rb, change, true);
786 
787  /* process partial change */
788  ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
789 
790  /* check the memory limits and evict something if needed */
792 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:87
dlist_head changes
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
#define Assert(condition)
Definition: c.h:804
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snapshot,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 799 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), TeardownHistoricSnapshot(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeLogicalMsgOp().

803 {
804  if (transactional)
805  {
806  MemoryContext oldcontext;
807  ReorderBufferChange *change;
808 
810 
811  oldcontext = MemoryContextSwitchTo(rb->context);
812 
813  change = ReorderBufferGetChange(rb);
815  change->data.msg.prefix = pstrdup(prefix);
816  change->data.msg.message_size = message_size;
817  change->data.msg.message = palloc(message_size);
818  memcpy(change->data.msg.message, message, message_size);
819 
820  ReorderBufferQueueChange(rb, xid, lsn, change, false);
821 
822  MemoryContextSwitchTo(oldcontext);
823  }
824  else
825  {
826  ReorderBufferTXN *txn = NULL;
827  volatile Snapshot snapshot_now = snapshot;
828 
829  if (xid != InvalidTransactionId)
830  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
831 
832  /* setup snapshot to allow catalog access */
833  SetupHistoricSnapshot(snapshot_now, NULL);
834  PG_TRY();
835  {
836  rb->message(rb, txn, lsn, false, prefix, message_size, message);
837 
839  }
840  PG_CATCH();
841  {
843  PG_RE_THROW();
844  }
845  PG_END_TRY();
846  }
847 }
char * pstrdup(const char *in)
Definition: mcxt.c:1299
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2051
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
#define InvalidTransactionId
Definition: transam.h:31
ReorderBufferMessageCB message
union ReorderBufferChange::@97 data
MemoryContext context
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
#define PG_CATCH()
Definition: elog.h:323
#define Assert(condition)
Definition: c.h:804
struct ReorderBufferChange::@97::@100 msg
#define PG_RE_THROW()
Definition: elog.h:354
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:1062
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2035
#define PG_TRY()
Definition: elog.h:313
#define PG_END_TRY()
Definition: elog.h:338

◆ ReorderBufferRememberPrepareInfo()

bool ReorderBufferRememberPrepareInfo ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  prepare_lsn,
XLogRecPtr  end_lsn,
TimestampTz  prepare_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2623 of file reorderbuffer.c.

References ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodePrepare().

2627 {
2628  ReorderBufferTXN *txn;
2629 
2630  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2631 
2632  /* unknown transaction, nothing to do */
2633  if (txn == NULL)
2634  return false;
2635 
2636  /*
2637  * Remember the prepare information to be later used by commit prepared in
2638  * case we skip doing prepare.
2639  */
2640  txn->final_lsn = prepare_lsn;
2641  txn->end_lsn = end_lsn;
2642  txn->commit_time = prepare_time;
2643  txn->origin_id = origin_id;
2644  txn->origin_lsn = origin_lsn;
2645 
2646  return true;
2647 }
TimestampTz commit_time
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
RepOriginId origin_id
XLogRecPtr origin_lsn
XLogRecPtr final_lsn
XLogRecPtr end_lsn
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferReplay()

static void ReorderBufferReplay ( ReorderBufferTXN txn,
ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)
static

Definition at line 2540 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, FirstCommandId, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, rbtxn_is_streamed, rbtxn_prepared, ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), and ReorderBufferStreamCommit().

Referenced by ReorderBufferCommit(), ReorderBufferFinishPrepared(), and ReorderBufferPrepare().

2545 {
2546  Snapshot snapshot_now;
2547  CommandId command_id = FirstCommandId;
2548 
2549  txn->final_lsn = commit_lsn;
2550  txn->end_lsn = end_lsn;
2551  txn->commit_time = commit_time;
2552  txn->origin_id = origin_id;
2553  txn->origin_lsn = origin_lsn;
2554 
2555  /*
2556  * If the transaction was (partially) streamed, we need to commit it in a
2557  * 'streamed' way. That is, we first stream the remaining part of the
2558  * transaction, and then invoke stream_commit message.
2559  *
2560  * Called after everything (origin ID, LSN, ...) is stored in the
2561  * transaction to avoid passing that information directly.
2562  */
2563  if (rbtxn_is_streamed(txn))
2564  {
2565  ReorderBufferStreamCommit(rb, txn);
2566  return;
2567  }
2568 
2569  /*
2570  * If this transaction has no snapshot, it didn't make any changes to the
2571  * database, so there's nothing to decode. Note that
2572  * ReorderBufferCommitChild will have transferred any snapshots from
2573  * subtransactions if there were any.
2574  */
2575  if (txn->base_snapshot == NULL)
2576  {
2577  Assert(txn->ninvalidations == 0);
2578 
2579  /*
2580  * Removing this txn before a commit might result in the computation
2581  * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
2582  */
2583  if (!rbtxn_prepared(txn))
2584  ReorderBufferCleanupTXN(rb, txn);
2585  return;
2586  }
2587 
2588  snapshot_now = txn->base_snapshot;
2589 
2590  /* Process and send the changes to output plugin. */
2591  ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2592  command_id, false);
2593 }
uint32 CommandId
Definition: c.h:601
TimestampTz commit_time
Snapshot base_snapshot
#define rbtxn_prepared(txn)
RepOriginId origin_id
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
XLogRecPtr origin_lsn
#define FirstCommandId
Definition: c.h:603
#define rbtxn_is_streamed(txn)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn
static void ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)

◆ ReorderBufferResetTXN()

static void ReorderBufferResetTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id,
XLogRecPtr  last_lsn,
ReorderBufferChange specinsert 
)
static

Definition at line 1966 of file reorderbuffer.c.

References rbtxn_is_streamed, rbtxn_prepared, ReorderBufferReturnChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), and ReorderBuffer::stream_stop.

Referenced by ReorderBufferProcessTXN().

1971 {
1972  /* Discard the changes that we just streamed */
1974 
1975  /* Free all resources allocated for toast reconstruction */
1976  ReorderBufferToastReset(rb, txn);
1977 
1978  /* Return the spec insert change if it is not NULL */
1979  if (specinsert != NULL)
1980  {
1981  ReorderBufferReturnChange(rb, specinsert, true);
1982  specinsert = NULL;
1983  }
1984 
1985  /*
1986  * For the streaming case, stop the stream and remember the command ID and
1987  * snapshot for the streaming run.
1988  */
1989  if (rbtxn_is_streamed(txn))
1990  {
1991  rb->stream_stop(rb, txn, last_lsn);
1992  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
1993  }
1994 }
#define rbtxn_prepared(txn)
#define rbtxn_is_streamed(txn)
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
ReorderBufferStreamStopCB stream_stop

◆ ReorderBufferRestoreChange()

static void ReorderBufferRestoreChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
char *  change 
)
static

Definition at line 4171 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, ReorderBufferTXN::changes, ReorderBuffer::context, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::inval, MemoryContextAlloc(), MemoryContextAllocZero(), ReorderBufferChange::msg, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, offsetof, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferGetChange(), ReorderBufferGetRelids(), ReorderBufferGetTupleBuf(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, ReorderBufferChange::snapshot, SnapshotData::subxcnt, ReorderBufferChange::tp, ReorderBufferChange::truncate, SnapshotData::xcnt, and SnapshotData::xip.

Referenced by ReorderBufferRestoreChanges().

4173 {
4174  ReorderBufferDiskChange *ondisk;
4175  ReorderBufferChange *change;
4176 
4177  ondisk = (ReorderBufferDiskChange *) data;
4178 
4179  change = ReorderBufferGetChange(rb);
4180 
4181  /* copy static part */
4182  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
4183 
4184  data += sizeof(ReorderBufferDiskChange);
4185 
4186  /* restore individual stuff */
4187  switch (change->action)
4188  {
4189  /* fall through these, they're all similar enough */
4194  if (change->data.tp.oldtuple)
4195  {
4196  uint32 tuplelen = ((HeapTuple) data)->t_len;
4197 
4198  change->data.tp.oldtuple =
4200 
4201  /* restore ->tuple */
4202  memcpy(&change->data.tp.oldtuple->tuple, data,
4203  sizeof(HeapTupleData));
4204  data += sizeof(HeapTupleData);
4205 
4206  /* reset t_data pointer into the new tuplebuf */
4207  change->data.tp.oldtuple->tuple.t_data =
4208  ReorderBufferTupleBufData(change->data.tp.oldtuple);
4209 
4210  /* restore tuple data itself */
4211  memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
4212  data += tuplelen;
4213  }
4214 
4215  if (change->data.tp.newtuple)
4216  {
4217  /* here, data might not be suitably aligned! */
4218  uint32 tuplelen;
4219 
4220  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
4221  sizeof(uint32));
4222 
4223  change->data.tp.newtuple =
4225 
4226  /* restore ->tuple */
4227  memcpy(&change->data.tp.newtuple->tuple, data,
4228  sizeof(HeapTupleData));
4229  data += sizeof(HeapTupleData);
4230 
4231  /* reset t_data pointer into the new tuplebuf */
4232  change->data.tp.newtuple->tuple.t_data =
4233  ReorderBufferTupleBufData(change->data.tp.newtuple);
4234 
4235  /* restore tuple data itself */
4236  memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
4237  data += tuplelen;
4238  }
4239 
4240  break;
4242  {
4243  Size prefix_size;
4244 
4245  /* read prefix */
4246  memcpy(&prefix_size, data, sizeof(Size));
4247  data += sizeof(Size);
4248  change->data.msg.prefix = MemoryContextAlloc(rb->context,
4249  prefix_size);
4250  memcpy(change->data.msg.prefix, data, prefix_size);
4251  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
4252  data += prefix_size;
4253 
4254  /* read the message */
4255  memcpy(&change->data.msg.message_size, data, sizeof(Size));
4256  data += sizeof(Size);
4257  change->data.msg.message = MemoryContextAlloc(rb->context,
4258  change->data.msg.message_size);
4259  memcpy(change->data.msg.message, data,
4260  change->data.msg.message_size);
4261  data += change->data.msg.message_size;
4262 
4263  break;
4264  }
4266  {
4267  Size inval_size = sizeof(SharedInvalidationMessage) *
4268  change->data.inval.ninvalidations;
4269 
4270  change->data.inval.invalidations =
4271  MemoryContextAlloc(rb->context, inval_size);
4272 
4273  /* read the message */
4274  memcpy(change->data.inval.invalidations, data, inval_size);
4275 
4276  break;
4277  }
4279  {
4280  Snapshot oldsnap;
4281  Snapshot newsnap;
4282  Size size;
4283 
4284  oldsnap = (Snapshot) data;
4285 
4286  size = sizeof(SnapshotData) +
4287  sizeof(TransactionId) * oldsnap->xcnt +
4288  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
4289 
4290  change->data.snapshot = MemoryContextAllocZero(rb->context, size);
4291 
4292  newsnap = change->data.snapshot;
4293 
4294  memcpy(newsnap, data, size);
4295  newsnap->xip = (TransactionId *)
4296  (((char *) newsnap) + sizeof(SnapshotData));
4297  newsnap->subxip = newsnap->xip + newsnap->xcnt;
4298  newsnap->copied = true;
4299  break;
4300  }
4301  /* the base struct contains all the data, easy peasy */
4303  {
4304  Oid *relids;
4305 
4306  relids = ReorderBufferGetRelids(rb,
4307  change->data.truncate.nrelids);
4308  memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
4309  change->data.truncate.relids = relids;
4310 
4311  break;
4312  }
4316  break;
4317  }
4318 
4319  dlist_push_tail(&txn->changes, &change->node);
4320  txn->nentries_mem++;
4321 
4322  /*
4323  * Update memory accounting for the restored change. We need to do this
4324  * although we don't check the memory limit when restoring the changes in
4325  * this branch (we only do that when initially queueing the changes after
4326  * decoding), because we will release the changes later, and that will
4327  * update the accounting too (subtracting the size from the counters). And
4328  * we don't want to underflow there.
4329  */
4330  ReorderBufferChangeMemoryUpdate(rb, change, true);
4331 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
HeapTupleData * HeapTuple
Definition: htup.h:71
uint32 TransactionId
Definition: c.h:587
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
struct SnapshotData * Snapshot
Definition: snapshot.h:121
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:38
dlist_head changes
struct SnapshotData SnapshotData
unsigned int uint32
Definition: c.h:441
struct ReorderBufferChange::@97::@102 inval
union ReorderBufferChange::@97 data
TransactionId * xip
Definition: snapshot.h:168
MemoryContext context
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:906
struct ReorderBufferDiskChange ReorderBufferDiskChange
#define Assert(condition)
Definition: c.h:804
struct ReorderBufferChange::@97::@100 msg
size_t Size
Definition: c.h:540
struct ReorderBufferChange::@97::@98 tp
struct ReorderBufferChange::@97::@99 truncate
uint32 xcnt
Definition: snapshot.h:169
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863
Oid * ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
ReorderBufferChange change
struct HeapTupleData HeapTupleData
#define offsetof(type, field)
Definition: c.h:727
int32 subxcnt
Definition: snapshot.h:181
ReorderBufferTupleBuf * ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)

◆ ReorderBufferRestoreChanges()

static Size ReorderBufferRestoreChanges ( ReorderBuffer rb,
ReorderBufferTXN txn,
TXNEntryFile file,
XLogSegNo segno 
)
static

Definition at line 4030 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, cleanup(), dlist_mutable_iter::cur, TXNEntryFile::curOffset, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FileClose(), FileRead(), ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, max_changes_in_memory, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBuffer::outbuf, PathNameOpenFile(), PG_BINARY, ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferSerializedPath(), ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, TXNEntryFile::vfd, WAIT_EVENT_REORDER_BUFFER_READ, wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferIterTXNInit(), and ReorderBufferIterTXNNext().

4032 {
4033  Size restored = 0;
4034  XLogSegNo last_segno;
4035  dlist_mutable_iter cleanup_iter;
4036  File *fd = &file->vfd;
4037 
4040 
4041  /* free current entries, so we have memory for more */
4042  dlist_foreach_modify(cleanup_iter, &txn->changes)
4043  {
4045  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
4046 
4047  dlist_delete(&cleanup->node);
4048  ReorderBufferReturnChange(rb, cleanup, true);
4049  }
4050  txn->nentries_mem = 0;
4051  Assert(dlist_is_empty(&txn->changes));
4052 
4053  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
4054 
4055  while (restored < max_changes_in_memory && *segno <= last_segno)
4056  {
4057  int readBytes;
4058  ReorderBufferDiskChange *ondisk;
4059 
4060  if (*fd == -1)
4061  {
4062  char path[MAXPGPATH];
4063 
4064  /* first time in */
4065  if (*segno == 0)
4066  XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
4067 
4068  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
4069 
4070  /*
4071  * No need to care about TLIs here, only used during a single run,
4072  * so each LSN only maps to a specific WAL record.
4073  */
4075  *segno);
4076 
4077  *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
4078 
4079  /* No harm in resetting the offset even in case of failure */
4080  file->curOffset = 0;
4081 
4082  if (*fd < 0 && errno == ENOENT)
4083  {
4084  *fd = -1;
4085  (*segno)++;
4086  continue;
4087  }
4088  else if (*fd < 0)
4089  ereport(ERROR,
4091  errmsg("could not open file \"%s\": %m",
4092  path)));
4093  }
4094 
4095  /*
4096  * Read the statically sized part of a change which has information
4097  * about the total size. If we couldn't read a record, we're at the
4098  * end of this file.
4099  */
4101  readBytes = FileRead(file->vfd, rb->outbuf,
4102  sizeof(ReorderBufferDiskChange),
4104 
4105  /* eof */
4106  if (readBytes == 0)
4107  {
4108  FileClose(*fd);
4109  *fd = -1;
4110  (*segno)++;
4111  continue;
4112  }
4113  else if (readBytes < 0)
4114  ereport(ERROR,
4116  errmsg("could not read from reorderbuffer spill file: %m")));
4117  else if (readBytes != sizeof(ReorderBufferDiskChange))
4118  ereport(ERROR,
4120  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4121  readBytes,
4122  (uint32) sizeof(ReorderBufferDiskChange))));
4123 
4124  file->curOffset += readBytes;
4125 
4126  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4127 
4129  sizeof(ReorderBufferDiskChange) + ondisk->size);
4130  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4131 
4132  readBytes = FileRead(file->vfd,
4133  rb->outbuf + sizeof(ReorderBufferDiskChange),
4134  ondisk->size - sizeof(ReorderBufferDiskChange),
4135  file->curOffset,
4137 
4138  if (readBytes < 0)
4139  ereport(ERROR,
4141  errmsg("could not read from reorderbuffer spill file: %m")));
4142  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
4143  ereport(ERROR,
4145  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4146  readBytes,
4147  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
4148 
4149  file->curOffset += readBytes;
4150 
4151  /*
4152  * ok, read a full change from disk, now restore it into proper
4153  * in-memory format
4154  */
4155  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
4156  restored++;
4157  }
4158 
4159  return restored;
4160 }