PostgreSQL Source Code  git master
reorderbuffer.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/detoast.h"
#include "access/heapam.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/combocid.h"
#include "utils/memdebug.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenodemap.h"
Include dependency graph for reorderbuffer.c:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTXNByIdEnt
 
struct  ReorderBufferTupleCidKey
 
struct  ReorderBufferTupleCidEnt
 
struct  TXNEntryFile
 
struct  ReorderBufferIterTXNEntry
 
struct  ReorderBufferIterTXNState
 
struct  ReorderBufferToastEnt
 
struct  ReorderBufferDiskChange
 
struct  RewriteMappingFile
 

Macros

#define IsSpecInsert(action)
 
#define IsSpecConfirmOrAbort(action)
 
#define IsInsertOrUpdate(action)
 

Typedefs

typedef struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
 
typedef struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
 
typedef struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
 
typedef struct TXNEntryFile TXNEntryFile
 
typedef struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
 
typedef struct ReorderBufferIterTXNState ReorderBufferIterTXNState
 
typedef struct ReorderBufferToastEnt ReorderBufferToastEnt
 
typedef struct ReorderBufferDiskChange ReorderBufferDiskChange
 
typedef struct RewriteMappingFile RewriteMappingFile
 

Functions

static ReorderBufferTXNReorderBufferGetTXN (ReorderBuffer *rb)
 
static void ReorderBufferReturnTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferTXNReorderBufferTXNByXid (ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
 
static void ReorderBufferTransferSnapToParent (ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
 
static void AssertTXNLsnOrder (ReorderBuffer *rb)
 
static void ReorderBufferIterTXNInit (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
 
static ReorderBufferChangeReorderBufferIterTXNNext (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferIterTXNFinish (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferExecuteInvalidations (uint32 nmsgs, SharedInvalidationMessage *msgs)
 
static void ReorderBufferCheckMemoryLimit (ReorderBuffer *rb)
 
static void ReorderBufferSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeChange (ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
 
static Size ReorderBufferRestoreChanges (ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
 
static void ReorderBufferRestoreChange (ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
 
static void ReorderBufferRestoreCleanup (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferTruncateTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
 
static void ReorderBufferCleanupSerializedTXNs (const char *slotname)
 
static void ReorderBufferSerializedPath (char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
 
static void ReorderBufferFreeSnap (ReorderBuffer *rb, Snapshot snap)
 
static Snapshot ReorderBufferCopySnap (ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
 
static bool ReorderBufferCanStream (ReorderBuffer *rb)
 
static bool ReorderBufferCanStartStreaming (ReorderBuffer *rb)
 
static void ReorderBufferStreamTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferStreamCommit (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastInitHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReset (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReplace (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void ReorderBufferToastAppendChunk (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static Size ReorderBufferChangeSize (ReorderBufferChange *change)
 
static void ReorderBufferChangeMemoryUpdate (ReorderBuffer *rb, ReorderBufferChange *change, bool addition, Size sz)
 
ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *rb)
 
void ReorderBufferReturnChange (ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
 
ReorderBufferTupleBufReorderBufferGetTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
 
OidReorderBufferGetRelids (ReorderBuffer *rb, int nrelids)
 
void ReorderBufferReturnRelids (ReorderBuffer *rb, Oid *relids)
 
static void ReorderBufferProcessPartialChange (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
static void AssertChangeLsnOrder (ReorderBufferTXN *txn)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
TransactionId ReorderBufferGetOldestXmin (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
static int ReorderBufferIterCompare (Datum a, Datum b, void *arg)
 
static void ReorderBufferCleanupTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferBuildTupleCidHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void SetupCheckXidLive (TransactionId xid)
 
static void ReorderBufferApplyChange (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyTruncate (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyMessage (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferSaveTXNSnapshot (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
 
static void ReorderBufferResetTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
 
static void ReorderBufferProcessTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)
 
static void ReorderBufferReplay (ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
bool ReorderBufferRememberPrepareInfo (ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferSkipPrepare (ReorderBuffer *rb, TransactionId xid)
 
void ReorderBufferPrepare (ReorderBuffer *rb, TransactionId xid, char *gid)
 
void ReorderBufferFinishPrepared (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferInvalidate (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
static void ReorderBufferSerializeReserve (ReorderBuffer *rb, Size sz)
 
static ReorderBufferTXNReorderBufferLargestTXN (ReorderBuffer *rb)
 
static ReorderBufferTXNReorderBufferLargestTopTXN (ReorderBuffer *rb)
 
void StartupReorderBuffer (void)
 
static void ApplyLogicalMappingFile (HTAB *tuplecid_data, Oid relid, const char *fname)
 
static bool TransactionIdInArray (TransactionId xid, TransactionId *xip, Size num)
 
static int file_sort_by_lsn (const ListCell *a_p, const ListCell *b_p)
 
static void UpdateLogicalMappings (HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
 
bool ResolveCminCmaxDuringDecoding (HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
 

Variables

int logical_decoding_work_mem
 
static const Size max_changes_in_memory = 4096
 

Macro Definition Documentation

◆ IsInsertOrUpdate

◆ IsSpecConfirmOrAbort

◆ IsSpecInsert

Typedef Documentation

◆ ReorderBufferDiskChange

◆ ReorderBufferIterTXNEntry

◆ ReorderBufferIterTXNState

◆ ReorderBufferToastEnt

◆ ReorderBufferTupleCidEnt

◆ ReorderBufferTupleCidKey

◆ ReorderBufferTXNByIdEnt

◆ RewriteMappingFile

◆ TXNEntryFile

typedef struct TXNEntryFile TXNEntryFile

Function Documentation

◆ ApplyLogicalMappingFile()

static void ApplyLogicalMappingFile ( HTAB tuplecid_data,
Oid  relid,
const char *  fname 
)
static

Definition at line 4886 of file reorderbuffer.c.

References Assert, CloseTransientFile(), ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), HASH_ENTER, HASH_FIND, hash_search(), InvalidCommandId, ItemPointerCopy, sort-test::key, MAXPGPATH, LogicalRewriteMappingData::new_node, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_node, LogicalRewriteMappingData::old_tid, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, ReorderBufferTupleCidKey::relnode, sprintf, ReorderBufferTupleCidKey::tid, and WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ.

Referenced by UpdateLogicalMappings().

4887 {
4888  char path[MAXPGPATH];
4889  int fd;
4890  int readBytes;
4892 
4893  sprintf(path, "pg_logical/mappings/%s", fname);
4894  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
4895  if (fd < 0)
4896  ereport(ERROR,
4898  errmsg("could not open file \"%s\": %m", path)));
4899 
4900  while (true)
4901  {
4904  ReorderBufferTupleCidEnt *new_ent;
4905  bool found;
4906 
4907  /* be careful about padding */
4908  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
4909 
4910  /* read all mappings till the end of the file */
4912  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
4914 
4915  if (readBytes < 0)
4916  ereport(ERROR,
4918  errmsg("could not read file \"%s\": %m",
4919  path)));
4920  else if (readBytes == 0) /* EOF */
4921  break;
4922  else if (readBytes != sizeof(LogicalRewriteMappingData))
4923  ereport(ERROR,
4925  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
4926  path, readBytes,
4927  (int32) sizeof(LogicalRewriteMappingData))));
4928 
4929  key.relnode = map.old_node;
4930  ItemPointerCopy(&map.old_tid,
4931  &key.tid);
4932 
4933 
4934  ent = (ReorderBufferTupleCidEnt *)
4935  hash_search(tuplecid_data,
4936  (void *) &key,
4937  HASH_FIND,
4938  NULL);
4939 
4940  /* no existing mapping, no need to update */
4941  if (!ent)
4942  continue;
4943 
4944  key.relnode = map.new_node;
4945  ItemPointerCopy(&map.new_tid,
4946  &key.tid);
4947 
4948  new_ent = (ReorderBufferTupleCidEnt *)
4949  hash_search(tuplecid_data,
4950  (void *) &key,
4951  HASH_ENTER,
4952  &found);
4953 
4954  if (found)
4955  {
4956  /*
4957  * Make sure the existing mapping makes sense. We sometime update
4958  * old records that did not yet have a cmax (e.g. pg_class' own
4959  * entry while rewriting it) during rewrites, so allow that.
4960  */
4961  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
4962  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
4963  }
4964  else
4965  {
4966  /* update mapping */
4967  new_ent->cmin = ent->cmin;
4968  new_ent->cmax = ent->cmax;
4969  new_ent->combocid = ent->combocid;
4970  }
4971  }
4972 
4973  if (CloseTransientFile(fd) != 0)
4974  ereport(ERROR,
4976  errmsg("could not close file \"%s\": %m", path)));
4977 }
static void pgstat_report_wait_end(void)
Definition: wait_event.h:274
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1271
signed int int32
Definition: c.h:429
#define sprintf
Definition: port.h:219
#define ERROR
Definition: elog.h:46
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2509
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:721
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:258
int CloseTransientFile(int fd)
Definition: fd.c:2686
#define InvalidCommandId
Definition: c.h:604
#define ereport(elevel,...)
Definition: elog.h:157
ItemPointerData new_tid
Definition: rewriteheap.h:40
#define Assert(condition)
Definition: c.h:804
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define read(a, b, c)
Definition: win32.h:13
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:161
ItemPointerData old_tid
Definition: rewriteheap.h:39

◆ AssertChangeLsnOrder()

static void AssertChangeLsnOrder ( ReorderBufferTXN txn)
static

Definition at line 928 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, and ReorderBufferChange::lsn.

Referenced by ReorderBufferIterTXNInit().

929 {
930 #ifdef USE_ASSERT_CHECKING
931  dlist_iter iter;
932  XLogRecPtr prev_lsn = txn->first_lsn;
933 
934  dlist_foreach(iter, &txn->changes)
935  {
936  ReorderBufferChange *cur_change;
937 
938  cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
939 
941  Assert(cur_change->lsn != InvalidXLogRecPtr);
942  Assert(txn->first_lsn <= cur_change->lsn);
943 
944  if (txn->end_lsn != InvalidXLogRecPtr)
945  Assert(cur_change->lsn <= txn->end_lsn);
946 
947  Assert(prev_lsn <= cur_change->lsn);
948 
949  prev_lsn = cur_change->lsn;
950  }
951 #endif
952 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_foreach(iter, lhead)
Definition: ilist.h:526
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
dlist_head changes
dlist_node * cur
Definition: ilist.h:161
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn

◆ AssertTXNLsnOrder()

static void AssertTXNLsnOrder ( ReorderBuffer rb)
static

Definition at line 871 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferAssignChild(), ReorderBufferGetOldestTXN(), ReorderBufferGetOldestXmin(), ReorderBufferSetBaseSnapshot(), and ReorderBufferTXNByXid().

872 {
873 #ifdef USE_ASSERT_CHECKING
874  dlist_iter iter;
875  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
876  XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
877 
878  dlist_foreach(iter, &rb->toplevel_by_lsn)
879  {
881  iter.cur);
882 
883  /* start LSN must be set */
884  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
885 
886  /* If there is an end LSN, it must be higher than start LSN */
887  if (cur_txn->end_lsn != InvalidXLogRecPtr)
888  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
889 
890  /* Current initial LSN must be strictly higher than previous */
891  if (prev_first_lsn != InvalidXLogRecPtr)
892  Assert(prev_first_lsn < cur_txn->first_lsn);
893 
894  /* known-as-subtxn txns must not be listed */
895  Assert(!rbtxn_is_known_subxact(cur_txn));
896 
897  prev_first_lsn = cur_txn->first_lsn;
898  }
899 
901  {
903  base_snapshot_node,
904  iter.cur);
905 
906  /* base snapshot (and its LSN) must be set */
907  Assert(cur_txn->base_snapshot != NULL);
909 
910  /* current LSN must be strictly higher than previous */
911  if (prev_base_snap_lsn != InvalidXLogRecPtr)
912  Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
913 
914  /* known-as-subtxn txns must not be listed */
915  Assert(!rbtxn_is_known_subxact(cur_txn));
916 
917  prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
918  }
919 #endif
920 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
#define dlist_foreach(iter, lhead)
Definition: ilist.h:526
XLogRecPtr base_snapshot_lsn
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
dlist_head txns_by_base_snapshot_lsn
dlist_head toplevel_by_lsn
dlist_node * cur
Definition: ilist.h:161
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn
#define rbtxn_is_known_subxact(txn)

◆ file_sort_by_lsn()

static int file_sort_by_lsn ( const ListCell a_p,
const ListCell b_p 
)
static

Definition at line 4994 of file reorderbuffer.c.

References lfirst, and RewriteMappingFile::lsn.

Referenced by UpdateLogicalMappings().

4995 {
4998 
4999  if (a->lsn < b->lsn)
5000  return -1;
5001  else if (a->lsn > b->lsn)
5002  return 1;
5003  return 0;
5004 }
#define lfirst(lc)
Definition: pg_list.h:169

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2823 of file reorderbuffer.c.

References ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), ReorderBuffer::stream_abort, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeAbort().

2824 {
2825  ReorderBufferTXN *txn;
2826 
2827  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2828  false);
2829 
2830  /* unknown, nothing to remove */
2831  if (txn == NULL)
2832  return;
2833 
2834  /* For streamed transactions notify the remote node about the abort. */
2835  if (rbtxn_is_streamed(txn))
2836  {
2837  rb->stream_abort(rb, txn, lsn);
2838 
2839  /*
2840  * We might have decoded changes for this transaction that could load
2841  * the cache as per the current transaction's view (consider DDL's
2842  * happened in this transaction). We don't want the decoding of future
2843  * transactions to use those cache entries so execute invalidations.
2844  */
2845  if (txn->ninvalidations > 0)
2847  txn->invalidations);
2848  }
2849 
2850  /* cosmetic... */
2851  txn->final_lsn = lsn;
2852 
2853  /* remove potential on-disk data, and deallocate */
2854  ReorderBufferCleanupTXN(rb, txn);
2855 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
ReorderBufferStreamAbortCB stream_abort
#define rbtxn_is_streamed(txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)

Definition at line 2865 of file reorderbuffer.c.

References dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, elog, ReorderBufferCleanupTXN(), ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::xid.

Referenced by DecodeStandbyOp().

2866 {
2867  dlist_mutable_iter it;
2868 
2869  /*
2870  * Iterate through all (potential) toplevel TXNs and abort all that are
2871  * older than what possibly can be running. Once we've found the first
2872  * that is alive we stop, there might be some that acquired an xid earlier
2873  * but started writing later, but it's unlikely and they will be cleaned
2874  * up in a later call to this function.
2875  */
2877  {
2878  ReorderBufferTXN *txn;
2879 
2880  txn = dlist_container(ReorderBufferTXN, node, it.cur);
2881 
2882  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2883  {
2884  elog(DEBUG2, "aborting old transaction %u", txn->xid);
2885 
2886  /* remove potential on-disk data, and deallocate this tx */
2887  ReorderBufferCleanupTXN(rb, txn);
2888  }
2889  else
2890  return;
2891  }
2892 }
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:543
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
#define DEBUG2
Definition: elog.h:24
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
dlist_head toplevel_by_lsn
TransactionId xid
#define elog(elevel,...)
Definition: elog.h:232

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 3197 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferTXN::invalidations, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, palloc(), REORDER_BUFFER_CHANGE_INVALIDATION, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), repalloc(), ReorderBufferTXN::toptxn, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeXactOp().

3200 {
3201  ReorderBufferTXN *txn;
3202  MemoryContext oldcontext;
3203  ReorderBufferChange *change;
3204 
3205  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3206 
3207  oldcontext = MemoryContextSwitchTo(rb->context);
3208 
3209  /*
3210  * Collect all the invalidations under the top transaction so that we can
3211  * execute them all together. See comment atop this function
3212  */
3213  if (txn->toptxn)
3214  txn = txn->toptxn;
3215 
3216  Assert(nmsgs > 0);
3217 
3218  /* Accumulate invalidations. */
3219  if (txn->ninvalidations == 0)
3220  {
3221  txn->ninvalidations = nmsgs;
3223  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3224  memcpy(txn->invalidations, msgs,
3225  sizeof(SharedInvalidationMessage) * nmsgs);
3226  }
3227  else
3228  {
3231  (txn->ninvalidations + nmsgs));
3232 
3233  memcpy(txn->invalidations + txn->ninvalidations, msgs,
3234  nmsgs * sizeof(SharedInvalidationMessage));
3235  txn->ninvalidations += nmsgs;
3236  }
3237 
3238  change = ReorderBufferGetChange(rb);
3240  change->data.inval.ninvalidations = nmsgs;
3241  change->data.inval.invalidations = (SharedInvalidationMessage *)
3242  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3243  memcpy(change->data.inval.invalidations, msgs,
3244  sizeof(SharedInvalidationMessage) * nmsgs);
3245 
3246  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3247 
3248  MemoryContextSwitchTo(oldcontext);
3249 }
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:85
struct ReorderBufferTXN * toptxn
struct ReorderBufferChange::@97::@102 inval
union ReorderBufferChange::@97 data
MemoryContext context
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
#define Assert(condition)
Definition: c.h:804
SharedInvalidationMessage * invalidations
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1182
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:1062

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

Definition at line 3079 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::command_id, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, ReorderBufferGetChange(), and ReorderBufferQueueChange().

Referenced by SnapBuildProcessNewCid().

3081 {
3083 
3084  change->data.command_id = cid;
3086 
3087  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3088 }
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:85
union ReorderBufferChange::@97 data
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileNode  node,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 3161 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessNewCid().

3165 {
3167  ReorderBufferTXN *txn;
3168 
3169  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3170 
3171  change->data.tuplecid.node = node;
3172  change->data.tuplecid.tid = tid;
3173  change->data.tuplecid.cmin = cmin;
3174  change->data.tuplecid.cmax = cmax;
3175  change->data.tuplecid.combocid = combocid;
3176  change->lsn = lsn;
3177  change->txn = txn;
3179 
3180  dlist_push_tail(&txn->tuplecids, &change->node);
3181  txn->ntuplecids++;
3182 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:88
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:85
union ReorderBufferChange::@97 data
struct ReorderBufferChange::@97::@101 tuplecid
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
dlist_head tuplecids

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 3030 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, ReorderBufferGetChange(), ReorderBufferQueueChange(), and ReorderBufferChange::snapshot.

Referenced by SnapBuildDistributeNewCatalogSnapshot().

3032 {
3034 
3035  change->data.snapshot = snap;
3037 
3038  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3039 }
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:85
union ReorderBufferChange::@97 data
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)

◆ ReorderBufferAllocate()

ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 301 of file reorderbuffer.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, ReorderBufferCleanupSerializedTXNs(), ReorderBuffer::size, SLAB_DEFAULT_BLOCK_SIZE, SLAB_LARGE_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBuffer::toplevel_by_lsn, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBuffer::tup_context, ReorderBuffer::txn_context, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

302 {
303  ReorderBuffer *buffer;
304  HASHCTL hash_ctl;
305  MemoryContext new_ctx;
306 
307  Assert(MyReplicationSlot != NULL);
308 
309  /* allocate memory in own context, to have better accountability */
311  "ReorderBuffer",
313 
314  buffer =
315  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
316 
317  memset(&hash_ctl, 0, sizeof(hash_ctl));
318 
319  buffer->context = new_ctx;
320 
321  buffer->change_context = SlabContextCreate(new_ctx,
322  "Change",
324  sizeof(ReorderBufferChange));
325 
326  buffer->txn_context = SlabContextCreate(new_ctx,
327  "TXN",
329  sizeof(ReorderBufferTXN));
330 
331  buffer->tup_context = GenerationContextCreate(new_ctx,
332  "Tuples",
334 
335  hash_ctl.keysize = sizeof(TransactionId);
336  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
337  hash_ctl.hcxt = buffer->context;
338 
339  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
341 
343  buffer->by_txn_last_txn = NULL;
344 
345  buffer->outbuf = NULL;
346  buffer->outbufsize = 0;
347  buffer->size = 0;
348 
349  buffer->spillTxns = 0;
350  buffer->spillCount = 0;
351  buffer->spillBytes = 0;
352  buffer->streamTxns = 0;
353  buffer->streamCount = 0;
354  buffer->streamBytes = 0;
355  buffer->totalTxns = 0;
356  buffer->totalBytes = 0;
357 
359 
360  dlist_init(&buffer->toplevel_by_lsn);
362 
363  /*
364  * Ensure there's no stale data from prior uses of this slot, in case some
365  * prior exit avoided calling ReorderBufferFree. Failure to do this can
366  * produce duplicated txns, and it's very cheap if there's nothing there.
367  */
369 
370  return buffer;
371 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define AllocSetContextCreate
Definition: memutils.h:173
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
uint32 TransactionId
Definition: c.h:587
MemoryContext hcxt
Definition: hsearch.h:86
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
Size entrysize
Definition: hsearch.h:76
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:175
ReplicationSlotPersistentData data
Definition: slot.h:147
MemoryContext change_context
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:225
dlist_head txns_by_base_snapshot_lsn
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:195
#define InvalidTransactionId
Definition: transam.h:31
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
#define HASH_BLOBS
Definition: hsearch.h:97
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size blockSize)
Definition: generation.c:197
MemoryContext context
ReorderBufferTXN * by_txn_last_txn
dlist_head toplevel_by_lsn
Size keysize
Definition: hsearch.h:75
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:804
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:224
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863
#define NameStr(name)
Definition: c.h:681
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
MemoryContext tup_context
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext txn_context

◆ ReorderBufferApplyChange()

static void ReorderBufferApplyChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1910 of file reorderbuffer.c.

References ReorderBuffer::apply_change, and ReorderBuffer::stream_change.

Referenced by ReorderBufferProcessTXN().

1913 {
1914  if (streaming)
1915  rb->stream_change(rb, txn, relation, change);
1916  else
1917  rb->apply_change(rb, txn, relation, change);
1918 }
ReorderBufferApplyChangeCB apply_change
ReorderBufferStreamChangeCB stream_change

◆ ReorderBufferApplyMessage()

static void ReorderBufferApplyMessage ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1938 of file reorderbuffer.c.

References ReorderBufferChange::data, ReorderBufferChange::lsn, ReorderBuffer::message, ReorderBufferChange::msg, and ReorderBuffer::stream_message.

Referenced by ReorderBufferProcessTXN().

1940 {
1941  if (streaming)
1942  rb->stream_message(rb, txn, change->lsn, true,
1943  change->data.msg.prefix,
1944  change->data.msg.message_size,
1945  change->data.msg.message);
1946  else
1947  rb->message(rb, txn, change->lsn, true,
1948  change->data.msg.prefix,
1949  change->data.msg.message_size,
1950  change->data.msg.message);
1951 }
ReorderBufferStreamMessageCB stream_message
ReorderBufferMessageCB message
union ReorderBufferChange::@97 data
struct ReorderBufferChange::@97::@100 msg

◆ ReorderBufferApplyTruncate()

static void ReorderBufferApplyTruncate ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  nrelations,
Relation relations,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1924 of file reorderbuffer.c.

References ReorderBuffer::apply_truncate, and ReorderBuffer::stream_truncate.

Referenced by ReorderBufferProcessTXN().

1927 {
1928  if (streaming)
1929  rb->stream_truncate(rb, txn, nrelations, relations, change);
1930  else
1931  rb->apply_truncate(rb, txn, nrelations, relations, change);
1932 }
ReorderBufferStreamTruncateCB stream_truncate
ReorderBufferApplyTruncateCB apply_truncate

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 1014 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SUBXACT, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, ReorderBufferTXN::toplevel_xid, ReorderBufferTXN::toptxn, ReorderBufferTXNByIdEnt::txn, ReorderBufferTXN::txn_flags, and ReorderBufferTXNByIdEnt::xid.

Referenced by LogicalDecodingProcessRecord(), and ReorderBufferCommitChild().

1016 {
1017  ReorderBufferTXN *txn;
1018  ReorderBufferTXN *subtxn;
1019  bool new_top;
1020  bool new_sub;
1021 
1022  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1023  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1024 
1025  if (!new_sub)
1026  {
1027  if (rbtxn_is_known_subxact(subtxn))
1028  {
1029  /* already associated, nothing to do */
1030  return;
1031  }
1032  else
1033  {
1034  /*
1035  * We already saw this transaction, but initially added it to the
1036  * list of top-level txns. Now that we know it's not top-level,
1037  * remove it from there.
1038  */
1039  dlist_delete(&subtxn->node);
1040  }
1041  }
1042 
1043  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1044  subtxn->toplevel_xid = xid;
1045  Assert(subtxn->nsubtxns == 0);
1046 
1047  /* set the reference to top-level transaction */
1048  subtxn->toptxn = txn;
1049 
1050  /* add to subtransaction list */
1051  dlist_push_tail(&txn->subtxns, &subtxn->node);
1052  txn->nsubtxns++;
1053 
1054  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1055  ReorderBufferTransferSnapToParent(txn, subtxn);
1056 
1057  /* Verify LSN-ordering invariant */
1058  AssertTXNLsnOrder(rb);
1059 }
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
#define RBTXN_IS_SUBXACT
struct ReorderBufferTXN * toptxn
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:804
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_known_subxact(txn)
TransactionId toplevel_xid

◆ ReorderBufferBuildTupleCidHash()

static void ReorderBufferBuildTupleCidHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1673 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, dlist_container, dlist_foreach, dlist_is_empty(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), HASHCTL::hcxt, InvalidCommandId, ItemPointerCopy, sort-test::key, HASHCTL::keysize, ReorderBufferTXN::ntuplecids, rbtxn_has_catalog_changes, ReorderBufferTupleCidKey::relnode, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferTupleCidKey::tid, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::tuplecids.

Referenced by ReorderBufferProcessTXN().

1674 {
1675  dlist_iter iter;
1676  HASHCTL hash_ctl;
1677 
1679  return;
1680 
1681  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1682  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1683  hash_ctl.hcxt = rb->context;
1684 
1685  /*
1686  * create the hash with the exact number of to-be-stored tuplecids from
1687  * the start
1688  */
1689  txn->tuplecid_hash =
1690  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1692 
1693  dlist_foreach(iter, &txn->tuplecids)
1694  {
1697  bool found;
1698  ReorderBufferChange *change;
1699 
1700  change = dlist_container(ReorderBufferChange, node, iter.cur);
1701 
1703 
1704  /* be careful about padding */
1705  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1706 
1707  key.relnode = change->data.tuplecid.node;
1708 
1709  ItemPointerCopy(&change->data.tuplecid.tid,
1710  &key.tid);
1711 
1712  ent = (ReorderBufferTupleCidEnt *)
1714  (void *) &key,
1715  HASH_ENTER,
1716  &found);
1717  if (!found)
1718  {
1719  ent->cmin = change->data.tuplecid.cmin;
1720  ent->cmax = change->data.tuplecid.cmax;
1721  ent->combocid = change->data.tuplecid.combocid;
1722  }
1723  else
1724  {
1725  /*
1726  * Maybe we already saw this tuple before in this transaction, but
1727  * if so it must have the same cmin.
1728  */
1729  Assert(ent->cmin == change->data.tuplecid.cmin);
1730 
1731  /*
1732  * cmax may be initially invalid, but once set it can only grow,
1733  * and never become invalid again.
1734  */
1735  Assert((ent->cmax == InvalidCommandId) ||
1736  ((change->data.tuplecid.cmax != InvalidCommandId) &&
1737  (change->data.tuplecid.cmax > ent->cmax)));
1738  ent->cmax = change->data.tuplecid.cmax;
1739  }
1740  }
1741 }
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
MemoryContext hcxt
Definition: hsearch.h:86
#define dlist_foreach(iter, lhead)
Definition: ilist.h:526
Size entrysize
Definition: hsearch.h:76
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:85
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
union ReorderBufferChange::@97 data
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
#define HASH_BLOBS
Definition: hsearch.h:97
MemoryContext context
#define InvalidCommandId
Definition: c.h:604
Size keysize
Definition: hsearch.h:75
dlist_node * cur
Definition: ilist.h:161
#define Assert(condition)
Definition: c.h:804
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
struct ReorderBufferChange::@97::@101 tuplecid
#define rbtxn_has_catalog_changes(txn)
dlist_head tuplecids
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:161

◆ ReorderBufferCanStartStreaming()

static bool ReorderBufferCanStartStreaming ( ReorderBuffer rb)
inlinestatic

Definition at line 3833 of file reorderbuffer.c.

References XLogReaderState::EndRecPtr, ReorderBuffer::private_data, LogicalDecodingContext::reader, ReorderBufferCanStream(), SNAPBUILD_CONSISTENT, SnapBuildCurrentState(), SnapBuildXactNeedsSkip(), and LogicalDecodingContext::snapshot_builder.

Referenced by ReorderBufferCheckMemoryLimit(), and ReorderBufferProcessPartialChange().

3834 {
3836  SnapBuild *builder = ctx->snapshot_builder;
3837 
3838  /* We can't start streaming unless a consistent state is reached. */
3840  return false;
3841 
3842  /*
3843  * We can't start streaming immediately even if the streaming is enabled
3844  * because we previously decoded this transaction and now just are
3845  * restarting.
3846  */
3847  if (ReorderBufferCanStream(rb) &&
3848  !SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
3849  return true;
3850 
3851  return false;
3852 }
void * private_data
bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
Definition: snapbuild.c:394
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
Definition: snapbuild.c:367
XLogRecPtr EndRecPtr
Definition: xlogreader.h:176
static bool ReorderBufferCanStream(ReorderBuffer *rb)
struct SnapBuild * snapshot_builder
Definition: logical.h:43
XLogReaderState * reader
Definition: logical.h:41

◆ ReorderBufferCanStream()

static bool ReorderBufferCanStream ( ReorderBuffer rb)
inlinestatic

◆ ReorderBufferChangeMemoryUpdate()

static void ReorderBufferChangeMemoryUpdate ( ReorderBuffer rb,
ReorderBufferChange change,
bool  addition,
Size  sz 
)
static

Definition at line 3104 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferTXN::size, ReorderBuffer::size, ReorderBufferTXN::toptxn, ReorderBufferTXN::total_size, ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferReturnChange(), and ReorderBufferToastReplace().

3107 {
3108  ReorderBufferTXN *txn;
3109  ReorderBufferTXN *toptxn;
3110 
3111  Assert(change->txn);
3112 
3113  /*
3114  * Ignore tuple CID changes, because those are not evicted when reaching
3115  * memory limit. So we just don't count them, because it might easily
3116  * trigger a pointless attempt to spill.
3117  */
3119  return;
3120 
3121  txn = change->txn;
3122 
3123  /*
3124  * Update the total size in top level as well. This is later used to
3125  * compute the decoding stats.
3126  */
3127  if (txn->toptxn != NULL)
3128  toptxn = txn->toptxn;
3129  else
3130  toptxn = txn;
3131 
3132  if (addition)
3133  {
3134  txn->size += sz;
3135  rb->size += sz;
3136 
3137  /* Update the total size in the top transaction. */
3138  toptxn->total_size += sz;
3139  }
3140  else
3141  {
3142  Assert((rb->size >= sz) && (txn->size >= sz));
3143  txn->size -= sz;
3144  rb->size -= sz;
3145 
3146  /* Update the total size in the top transaction. */
3147  toptxn->total_size -= sz;
3148  }
3149 
3150  Assert(txn->size <= rb->size);
3151 }
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:88
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:85
struct ReorderBufferTXN * toptxn
#define Assert(condition)
Definition: c.h:804

◆ ReorderBufferChangeSize()

static Size ReorderBufferChangeSize ( ReorderBufferChange change)
static

Definition at line 3976 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::msg, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::snapshot, SnapshotData::subxcnt, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTupleBuf::tuple, and SnapshotData::xcnt.

Referenced by ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferReturnChange(), and ReorderBufferToastReplace().

3977 {
3978  Size sz = sizeof(ReorderBufferChange);
3979 
3980  switch (change->action)
3981  {
3982  /* fall through these, they're all similar enough */
3987  {
3988  ReorderBufferTupleBuf *oldtup,
3989  *newtup;
3990  Size oldlen = 0;
3991  Size newlen = 0;
3992 
3993  oldtup = change->data.tp.oldtuple;
3994  newtup = change->data.tp.newtuple;
3995 
3996  if (oldtup)
3997  {
3998  sz += sizeof(HeapTupleData);
3999  oldlen = oldtup->tuple.t_len;
4000  sz += oldlen;
4001  }
4002 
4003  if (newtup)
4004  {
4005  sz += sizeof(HeapTupleData);
4006  newlen = newtup->tuple.t_len;
4007  sz += newlen;
4008  }
4009 
4010  break;
4011  }
4013  {
4014  Size prefix_size = strlen(change->data.msg.prefix) + 1;
4015 
4016  sz += prefix_size + change->data.msg.message_size +
4017  sizeof(Size) + sizeof(Size);
4018 
4019  break;
4020  }
4022  {
4023  sz += sizeof(SharedInvalidationMessage) *
4024  change->data.inval.ninvalidations;
4025  break;
4026  }
4028  {
4029  Snapshot snap;
4030 
4031  snap = change->data.snapshot;
4032 
4033  sz += sizeof(SnapshotData) +
4034  sizeof(TransactionId) * snap->xcnt +
4035  sizeof(TransactionId) * snap->subxcnt;
4036 
4037  break;
4038  }
4040  {
4041  sz += sizeof(Oid) * change->data.truncate.nrelids;
4042 
4043  break;
4044  }
4049  /* ReorderBufferChange contains everything important */
4050  break;
4051  }
4052 
4053  return sz;
4054 }
uint32 TransactionId
Definition: c.h:587
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:85
uint32 t_len
Definition: htup.h:64
HeapTupleData tuple
Definition: reorderbuffer.h:29
struct SnapshotData SnapshotData
struct ReorderBufferChange::@97::@102 inval
union ReorderBufferChange::@97 data
struct ReorderBufferChange::@97::@100 msg
size_t Size
Definition: c.h:540
struct ReorderBufferChange::@97::@98 tp
struct ReorderBufferChange::@97::@99 truncate
uint32 xcnt
Definition: snapshot.h:169
struct HeapTupleData HeapTupleData
struct ReorderBufferChange ReorderBufferChange
int32 subxcnt
Definition: snapshot.h:181

◆ ReorderBufferCheckMemoryLimit()

static void ReorderBufferCheckMemoryLimit ( ReorderBuffer rb)
static

Definition at line 3452 of file reorderbuffer.c.

References Assert, logical_decoding_work_mem, ReorderBufferTXN::nentries_mem, ReorderBufferCanStartStreaming(), ReorderBufferLargestTopTXN(), ReorderBufferLargestTXN(), ReorderBufferSerializeTXN(), ReorderBufferStreamTXN(), ReorderBufferTXN::size, ReorderBuffer::size, ReorderBufferTXN::toptxn, ReorderBufferTXN::total_size, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferQueueChange().

3453 {
3454  ReorderBufferTXN *txn;
3455 
3456  /* bail out if we haven't exceeded the memory limit */
3457  if (rb->size < logical_decoding_work_mem * 1024L)
3458  return;
3459 
3460  /*
3461  * Loop until we reach under the memory limit. One might think that just
3462  * by evicting the largest (sub)transaction we will come under the memory
3463  * limit based on assumption that the selected transaction is at least as
3464  * large as the most recent change (which caused us to go over the memory
3465  * limit). However, that is not true because a user can reduce the
3466  * logical_decoding_work_mem to a smaller value before the most recent
3467  * change.
3468  */
3469  while (rb->size >= logical_decoding_work_mem * 1024L)
3470  {
3471  /*
3472  * Pick the largest transaction (or subtransaction) and evict it from
3473  * memory by streaming, if possible. Otherwise, spill to disk.
3474  */
3476  (txn = ReorderBufferLargestTopTXN(rb)) != NULL)
3477  {
3478  /* we know there has to be one, because the size is not zero */
3479  Assert(txn && !txn->toptxn);
3480  Assert(txn->total_size > 0);
3481  Assert(rb->size >= txn->total_size);
3482 
3483  ReorderBufferStreamTXN(rb, txn);
3484  }
3485  else
3486  {
3487  /*
3488  * Pick the largest transaction (or subtransaction) and evict it
3489  * from memory by serializing it to disk.
3490  */
3491  txn = ReorderBufferLargestTXN(rb);
3492 
3493  /* we know there has to be one, because the size is not zero */
3494  Assert(txn);
3495  Assert(txn->size > 0);
3496  Assert(rb->size >= txn->size);
3497 
3498  ReorderBufferSerializeTXN(rb, txn);
3499  }
3500 
3501  /*
3502  * After eviction, the transaction should have no entries in memory,
3503  * and should use 0 bytes for changes.
3504  */
3505  Assert(txn->size == 0);
3506  Assert(txn->nentries_mem == 0);
3507  }
3508 
3509  /* We must be under the memory limit now. */
3510  Assert(rb->size < logical_decoding_work_mem * 1024L);
3511 }
static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferLargestTopTXN(ReorderBuffer *rb)
struct ReorderBufferTXN * toptxn
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:804
int logical_decoding_work_mem
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)

◆ ReorderBufferCleanupSerializedTXNs()

static void ReorderBufferCleanupSerializedTXNs ( const char *  slotname)
static

Definition at line 4400 of file reorderbuffer.c.

References AllocateDir(), dirent::d_name, ereport, errcode_for_file_access(), errmsg(), ERROR, FreeDir(), INFO, lstat, MAXPGPATH, ReadDirExtended(), S_ISDIR, snprintf, sprintf, and stat::st_mode.

Referenced by ReorderBufferAllocate(), ReorderBufferFree(), and StartupReorderBuffer().

4401 {
4402  DIR *spill_dir;
4403  struct dirent *spill_de;
4404  struct stat statbuf;
4405  char path[MAXPGPATH * 2 + 12];
4406 
4407  sprintf(path, "pg_replslot/%s", slotname);
4408 
4409  /* we're only handling directories here, skip if it's not ours */
4410  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
4411  return;
4412 
4413  spill_dir = AllocateDir(path);
4414  while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
4415  {
4416  /* only look at names that can be ours */
4417  if (strncmp(spill_de->d_name, "xid", 3) == 0)
4418  {
4419  snprintf(path, sizeof(path),
4420  "pg_replslot/%s/%s", slotname,
4421  spill_de->d_name);
4422 
4423  if (unlink(path) != 0)
4424  ereport(ERROR,
4426  errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
4427  path, slotname)));
4428  }
4429  }
4430  FreeDir(spill_dir);
4431 }
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2801
#define INFO
Definition: elog.h:33
Definition: dirent.h:9
#define sprintf
Definition: port.h:219
Definition: dirent.c:25
#define ERROR
Definition: elog.h:46
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:721
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2720
#define ereport(elevel,...)
Definition: elog.h:157
#define S_ISDIR(m)
Definition: win32_port.h:324
#define lstat(path, sb)
Definition: win32_port.h:284
int errmsg(const char *fmt,...)
Definition: elog.c:909
char d_name[MAX_PATH]
Definition: dirent.h:15
#define snprintf
Definition: port.h:217
int FreeDir(DIR *dir)
Definition: fd.c:2838

◆ ReorderBufferCleanupTXN()

static void ReorderBufferCleanupTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1450 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_node, ReorderBuffer::by_txn, ReorderBufferTXN::changes, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, HASH_REMOVE, hash_search(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, rbtxn_is_serialized, rbtxn_is_streamed, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferFreeSnap(), ReorderBufferRestoreCleanup(), ReorderBufferReturnChange(), ReorderBufferReturnTXN(), SnapBuildSnapDecRefcount(), ReorderBufferTXN::snapshot_now, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAbortOld(), ReorderBufferFinishPrepared(), ReorderBufferForget(), ReorderBufferProcessTXN(), ReorderBufferReplay(), and ReorderBufferStreamCommit().

1451 {
1452  bool found;
1453  dlist_mutable_iter iter;
1454 
1455  /* cleanup subtransactions & their changes */
1456  dlist_foreach_modify(iter, &txn->subtxns)
1457  {
1458  ReorderBufferTXN *subtxn;
1459 
1460  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1461 
1462  /*
1463  * Subtransactions are always associated to the toplevel TXN, even if
1464  * they originally were happening inside another subtxn, so we won't
1465  * ever recurse more than one level deep here.
1466  */
1467  Assert(rbtxn_is_known_subxact(subtxn));
1468  Assert(subtxn->nsubtxns == 0);
1469 
1470  ReorderBufferCleanupTXN(rb, subtxn);
1471  }
1472 
1473  /* cleanup changes in the txn */
1474  dlist_foreach_modify(iter, &txn->changes)
1475  {
1476  ReorderBufferChange *change;
1477 
1478  change = dlist_container(ReorderBufferChange, node, iter.cur);
1479 
1480  /* Check we're not mixing changes from different transactions. */
1481  Assert(change->txn == txn);
1482 
1483  ReorderBufferReturnChange(rb, change, true);
1484  }
1485 
1486  /*
1487  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1488  * They are always stored in the toplevel transaction.
1489  */
1490  dlist_foreach_modify(iter, &txn->tuplecids)
1491  {
1492  ReorderBufferChange *change;
1493 
1494  change = dlist_container(ReorderBufferChange, node, iter.cur);
1495 
1496  /* Check we're not mixing changes from different transactions. */
1497  Assert(change->txn == txn);
1499 
1500  ReorderBufferReturnChange(rb, change, true);
1501  }
1502 
1503  /*
1504  * Cleanup the base snapshot, if set.
1505  */
1506  if (txn->base_snapshot != NULL)
1507  {
1510  }
1511 
1512  /*
1513  * Cleanup the snapshot for the last streamed run.
1514  */
1515  if (txn->snapshot_now != NULL)
1516  {
1517  Assert(rbtxn_is_streamed(txn));
1519  }
1520 
1521  /*
1522  * Remove TXN from its containing list.
1523  *
1524  * Note: if txn is known as subxact, we are deleting the TXN from its
1525  * parent's list of known subxacts; this leaves the parent's nsubxacts
1526  * count too high, but we don't care. Otherwise, we are deleting the TXN
1527  * from the LSN-ordered list of toplevel TXNs.
1528  */
1529  dlist_delete(&txn->node);
1530 
1531  /* now remove reference from buffer */
1532  hash_search(rb->by_txn,
1533  (void *) &txn->xid,
1534  HASH_REMOVE,
1535  &found);
1536  Assert(found);
1537 
1538  /* remove entries spilled to disk */
1539  if (rbtxn_is_serialized(txn))
1540  ReorderBufferRestoreCleanup(rb, txn);
1541 
1542  /* deallocate */
1543  ReorderBufferReturnTXN(rb, txn);
1544 }
Snapshot base_snapshot
dlist_node base_snapshot_node
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:543
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:88
Snapshot snapshot_now
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:85
#define rbtxn_is_streamed(txn)
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
dlist_head changes
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
#define Assert(condition)
Definition: c.h:804
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:418
dlist_head subtxns
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
#define rbtxn_is_serialized(txn)
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
#define rbtxn_is_known_subxact(txn)
dlist_head tuplecids

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2632 of file reorderbuffer.c.

References InvalidXLogRecPtr, ReorderBufferReplay(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

2636 {
2637  ReorderBufferTXN *txn;
2638 
2639  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2640  false);
2641 
2642  /* unknown transaction, nothing to replay */
2643  if (txn == NULL)
2644  return;
2645 
2646  ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2647  origin_id, origin_lsn);
2648 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 1134 of file reorderbuffer.c.

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit(), and DecodePrepare().

1137 {
1138  ReorderBufferTXN *subtxn;
1139 
1140  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1141  InvalidXLogRecPtr, false);
1142 
1143  /*
1144  * No need to do anything if that subtxn didn't contain any changes
1145  */
1146  if (!subtxn)
1147  return;
1148 
1149  subtxn->final_lsn = commit_lsn;
1150  subtxn->end_lsn = end_lsn;
1151 
1152  /*
1153  * Assign this subxact as a child of the toplevel xact (no-op if already
1154  * done.)
1155  */
1156  ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1157 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr final_lsn
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
XLogRecPtr end_lsn
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferCopySnap()

static Snapshot ReorderBufferCopySnap ( ReorderBuffer rb,
Snapshot  orig_snap,
ReorderBufferTXN txn,
CommandId  cid 
)
static

Definition at line 1749 of file reorderbuffer.c.

References SnapshotData::active_count, ReorderBuffer::context, SnapshotData::copied, dlist_iter::cur, SnapshotData::curcid, dlist_container, dlist_foreach, i, MemoryContextAllocZero(), ReorderBufferTXN::nsubtxns, qsort, SnapshotData::regd_count, ReorderBufferTXN::subtxns, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::xcnt, ReorderBufferTXN::xid, xidComparator(), and SnapshotData::xip.

Referenced by ReorderBufferProcessTXN(), ReorderBufferSaveTXNSnapshot(), and ReorderBufferStreamTXN().

1751 {
1752  Snapshot snap;
1753  dlist_iter iter;
1754  int i = 0;
1755  Size size;
1756 
1757  size = sizeof(SnapshotData) +
1758  sizeof(TransactionId) * orig_snap->xcnt +
1759  sizeof(TransactionId) * (txn->nsubtxns + 1);
1760 
1761  snap = MemoryContextAllocZero(rb->context, size);
1762  memcpy(snap, orig_snap, sizeof(SnapshotData));
1763 
1764  snap->copied = true;
1765  snap->active_count = 1; /* mark as active so nobody frees it */
1766  snap->regd_count = 0;
1767  snap->xip = (TransactionId *) (snap + 1);
1768 
1769  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1770 
1771  /*
1772  * snap->subxip contains all txids that belong to our transaction which we
1773  * need to check via cmin/cmax. That's why we store the toplevel
1774  * transaction in there as well.
1775  */
1776  snap->subxip = snap->xip + snap->xcnt;
1777  snap->subxip[i++] = txn->xid;
1778 
1779  /*
1780  * subxcnt isn't decreased when subtransactions abort, so count manually.
1781  * Since it's an upper boundary it is safe to use it for the allocation
1782  * above.
1783  */
1784  snap->subxcnt = 1;
1785 
1786  dlist_foreach(iter, &txn->subtxns)
1787  {
1788  ReorderBufferTXN *sub_txn;
1789 
1790  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1791  snap->subxip[i++] = sub_txn->xid;
1792  snap->subxcnt++;
1793  }
1794 
1795  /* sort so we can bsearch() later */
1796  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1797 
1798  /* store the specified current CommandId */
1799  snap->curcid = cid;
1800 
1801  return snap;
1802 }
uint32 TransactionId
Definition: c.h:587
bool copied
Definition: snapshot.h:185
#define dlist_foreach(iter, lhead)
Definition: ilist.h:526
uint32 regd_count
Definition: snapshot.h:205
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
struct SnapshotData SnapshotData
TransactionId * xip
Definition: snapshot.h:168
MemoryContext context
TransactionId xid
dlist_node * cur
Definition: ilist.h:161
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:906
CommandId curcid
Definition: snapshot.h:187
size_t Size
Definition: c.h:540
dlist_head subtxns
uint32 xcnt
Definition: snapshot.h:169
int i
#define qsort(a, b, c, d)
Definition: port.h:505
TransactionId * subxip
Definition: snapshot.h:180
uint32 active_count
Definition: snapshot.h:204
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:136
int32 subxcnt
Definition: snapshot.h:181

◆ ReorderBufferExecuteInvalidations()

static void ReorderBufferExecuteInvalidations ( uint32  nmsgs,
SharedInvalidationMessage msgs 
)
static

Definition at line 3256 of file reorderbuffer.c.

References i, and LocalExecuteInvalidationMessage().

Referenced by ReorderBufferFinishPrepared(), and ReorderBufferProcessTXN().

3257 {
3258  int i;
3259 
3260  for (i = 0; i < nmsgs; i++)
3262 }
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:600
int i

◆ ReorderBufferFinishPrepared()

void ReorderBufferFinishPrepared ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
XLogRecPtr  two_phase_at,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn,
char *  gid,
bool  is_commit 
)

Definition at line 2738 of file reorderbuffer.c.

References Assert, ReorderBuffer::commit_prepared, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, pstrdup(), RBTXN_PREPARE, ReorderBufferCleanupTXN(), ReorderBufferExecuteInvalidations(), ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBuffer::rollback_prepared, ReorderBufferTXNByIdEnt::txn, ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodeAbort(), and DecodeCommit().

2743 {
2744  ReorderBufferTXN *txn;
2745  XLogRecPtr prepare_end_lsn;
2746  TimestampTz prepare_time;
2747 
2748  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
2749 
2750  /* unknown transaction, nothing to do */
2751  if (txn == NULL)
2752  return;
2753 
2754  /*
2755  * By this time the txn has the prepare record information, remember it to
2756  * be later used for rollback.
2757  */
2758  prepare_end_lsn = txn->end_lsn;
2759  prepare_time = txn->xact_time.prepare_time;
2760 
2761  /* add the gid in the txn */
2762  txn->gid = pstrdup(gid);
2763 
2764  /*
2765  * It is possible that this transaction is not decoded at prepare time
2766  * either because by that time we didn't have a consistent snapshot, or
2767  * two_phase was not enabled, or it was decoded earlier but we have
2768  * restarted. We only need to send the prepare if it was not decoded
2769  * earlier. We don't need to decode the xact for aborts if it is not done
2770  * already.
2771  */
2772  if ((txn->final_lsn < two_phase_at) && is_commit)
2773  {
2774  txn->txn_flags |= RBTXN_PREPARE;
2775 
2776  /*
2777  * The prepare info must have been updated in txn even if we skip
2778  * prepare.
2779  */
2781 
2782  /*
2783  * By this time the txn has the prepare record information and it is
2784  * important to use that so that downstream gets the accurate
2785  * information. If instead, we have passed commit information here
2786  * then downstream can behave as it has already replayed commit
2787  * prepared after the restart.
2788  */
2789  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2790  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2791  }
2792 
2793  txn->final_lsn = commit_lsn;
2794  txn->end_lsn = end_lsn;
2795  txn->xact_time.commit_time = commit_time;
2796  txn->origin_id = origin_id;
2797  txn->origin_lsn = origin_lsn;
2798 
2799  if (is_commit)
2800  rb->commit_prepared(rb, txn, commit_lsn);
2801  else
2802  rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
2803 
2804  /* cleanup: make sure there's no cache pollution */
2806  txn->invalidations);
2807  ReorderBufferCleanupTXN(rb, txn);
2808 }
TimestampTz commit_time
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
RepOriginId origin_id
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1299
XLogRecPtr origin_lsn
XLogRecPtr final_lsn
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
union ReorderBufferTXN::@103 xact_time
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn
ReorderBufferRollbackPreparedCB rollback_prepared
SharedInvalidationMessage * invalidations
#define RBTXN_PREPARE
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
TimestampTz prepare_time
ReorderBufferCommitPreparedCB commit_prepared

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2908 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), ReorderBuffer::stream_abort, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

2909 {
2910  ReorderBufferTXN *txn;
2911 
2912  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2913  false);
2914 
2915  /* unknown, nothing to forget */
2916  if (txn == NULL)
2917  return;
2918 
2919  /* For streamed transactions notify the remote node about the abort. */
2920  if (rbtxn_is_streamed(txn))
2921  rb->stream_abort(rb, txn, lsn);
2922 
2923  /* cosmetic... */
2924  txn->final_lsn = lsn;
2925 
2926  /*
2927  * Process cache invalidation messages if there are any. Even if we're not
2928  * interested in the transaction's contents, it could have manipulated the
2929  * catalog and we need to update the caches according to that.
2930  */
2931  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2933  txn->invalidations);
2934  else
2935  Assert(txn->ninvalidations == 0);
2936 
2937  /* remove potential on-disk data, and deallocate */
2938  ReorderBufferCleanupTXN(rb, txn);
2939 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
ReorderBufferStreamAbortCB stream_abort
#define rbtxn_is_streamed(txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:804
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer rb)

Definition at line 377 of file reorderbuffer.c.

References ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

378 {
379  MemoryContext context = rb->context;
380 
381  /*
382  * We free separately allocated data by entirely scrapping reorderbuffer's
383  * memory context.
384  */
385  MemoryContextDelete(context);
386 
387  /* Free disk space used by unconsumed reorder buffers */
389 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:218
ReplicationSlotPersistentData data
Definition: slot.h:147
MemoryContext context
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NameStr(name)
Definition: c.h:681
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)

◆ ReorderBufferFreeSnap()

static void ReorderBufferFreeSnap ( ReorderBuffer rb,
Snapshot  snap 
)
static

Definition at line 1808 of file reorderbuffer.c.

References SnapshotData::copied, pfree(), and SnapBuildSnapDecRefcount().

Referenced by ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), ReorderBufferReturnChange(), and ReorderBufferStreamTXN().

1809 {
1810  if (snap->copied)
1811  pfree(snap);
1812  else
1814 }
bool copied
Definition: snapshot.h:185
void pfree(void *pointer)
Definition: mcxt.c:1169
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:418

◆ ReorderBufferGetChange()

ReorderBufferChange* ReorderBufferGetChange ( ReorderBuffer rb)

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer rb)

Definition at line 959 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessRunningXacts().

960 {
961  ReorderBufferTXN *txn;
962 
963  AssertTXNLsnOrder(rb);
964 
966  return NULL;
967 
969 
972  return txn;
973 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:506
dlist_head toplevel_by_lsn
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:804
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define rbtxn_is_known_subxact(txn)

◆ ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBuffer rb)

Definition at line 987 of file reorderbuffer.c.

References AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, dlist_head_element, dlist_is_empty(), InvalidTransactionId, ReorderBufferTXNByIdEnt::txn, ReorderBuffer::txns_by_base_snapshot_lsn, and SnapshotData::xmin.

Referenced by SnapBuildProcessRunningXacts().

988 {
989  ReorderBufferTXN *txn;
990 
991  AssertTXNLsnOrder(rb);
992 
994  return InvalidTransactionId;
995 
996  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
998  return txn->base_snapshot->xmin;
999 }
Snapshot base_snapshot
dlist_head txns_by_base_snapshot_lsn
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: snapshot.h:157
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:506
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289

◆ ReorderBufferGetRelids()

Oid* ReorderBufferGetRelids ( ReorderBuffer rb,
int  nrelids 
)

Definition at line 579 of file reorderbuffer.c.

References ReorderBuffer::context, and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

580 {
581  Oid *relids;
582  Size alloc_len;
583 
584  alloc_len = sizeof(Oid) * nrelids;
585 
586  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
587 
588  return relids;
589 }
unsigned int Oid
Definition: postgres_ext.h:31
MemoryContext context
size_t Size
Definition: c.h:540
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863

◆ ReorderBufferGetTupleBuf()

ReorderBufferTupleBuf* ReorderBufferGetTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)

Definition at line 543 of file reorderbuffer.c.

References ReorderBufferTupleBuf::alloc_tuple_size, MemoryContextAlloc(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, HeapTupleData::t_data, ReorderBuffer::tup_context, and ReorderBufferTupleBuf::tuple.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

544 {
545  ReorderBufferTupleBuf *tuple;
546  Size alloc_len;
547 
548  alloc_len = tuple_len + SizeofHeapTupleHeader;
549 
550  tuple = (ReorderBufferTupleBuf *)
552  sizeof(ReorderBufferTupleBuf) +
553  MAXIMUM_ALIGNOF + alloc_len);
554  tuple->alloc_tuple_size = alloc_len;
555  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
556 
557  return tuple;
558 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
HeapTupleHeader t_data
Definition: htup.h:68
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:38
HeapTupleData tuple
Definition: reorderbuffer.h:29
size_t Size
Definition: c.h:540
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863
MemoryContext tup_context

◆ ReorderBufferGetTXN()

static ReorderBufferTXN * ReorderBufferGetTXN ( ReorderBuffer rb)
static

Definition at line 395 of file reorderbuffer.c.

References ReorderBufferTXN::changes, ReorderBufferTXN::command_id, dlist_init(), InvalidCommandId, MemoryContextAlloc(), ReorderBufferTXN::output_plugin_private, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferTXNByIdEnt::txn, and ReorderBuffer::txn_context.

Referenced by ReorderBufferTXNByXid().

396 {
397  ReorderBufferTXN *txn;
398 
399  txn = (ReorderBufferTXN *)
401 
402  memset(txn, 0, sizeof(ReorderBufferTXN));
403 
404  dlist_init(&txn->changes);
405  dlist_init(&txn->tuplecids);
406  dlist_init(&txn->subtxns);
407 
408  /* InvalidCommandId is not zero, so set it explicitly */
410  txn->output_plugin_private = NULL;
411 
412  return txn;
413 }
CommandId command_id
dlist_head changes
#define InvalidCommandId
Definition: c.h:604
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
dlist_head subtxns
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863
void * output_plugin_private
MemoryContext txn_context
dlist_head tuplecids

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 2981 of file reorderbuffer.c.

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by DecodeXactOp(), ReorderBufferAbort(), ReorderBufferForget(), and ReorderBufferInvalidate().

2983 {
2984  bool use_subtxn = IsTransactionOrTransactionBlock();
2985  int i;
2986 
2987  if (use_subtxn)
2988  BeginInternalSubTransaction("replay");
2989 
2990  /*
2991  * Force invalidations to happen outside of a valid transaction - that way
2992  * entries will just be marked as invalid without accessing the catalog.
2993  * That's advantageous because we don't need to setup the full state
2994  * necessary for catalog access.
2995  */
2996  if (use_subtxn)
2998 
2999  for (i = 0; i < ninvalidations; i++)
3000  LocalExecuteInvalidationMessage(&invalidations[i]);
3001 
3002  if (use_subtxn)
3004 }
void AbortCurrentTransaction(void)
Definition: xact.c:3224
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4715
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4526
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4421
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:600
int i

◆ ReorderBufferInvalidate()

void ReorderBufferInvalidate ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2950 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodePrepare().

2951 {
2952  ReorderBufferTXN *txn;
2953 
2954  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2955  false);
2956 
2957  /* unknown, nothing to do */
2958  if (txn == NULL)
2959  return;
2960 
2961  /*
2962  * Process cache invalidation messages if there are any. Even if we're not
2963  * interested in the transaction's contents, it could have manipulated the
2964  * catalog and we need to update the caches according to that.
2965  */
2966  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2968  txn->invalidations);
2969  else
2970  Assert(txn->ninvalidations == 0);
2971 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
#define Assert(condition)
Definition: c.h:804
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferIterCompare()

static int ReorderBufferIterCompare ( Datum  a,
Datum  b,
void *  arg 
)
static

Definition at line 1176 of file reorderbuffer.c.

References DatumGetInt32, ReorderBufferIterTXNState::entries, and ReorderBufferIterTXNEntry::lsn.

Referenced by ReorderBufferIterTXNInit().

1177 {
1179  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
1180  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
1181 
1182  if (pos_a < pos_b)
1183  return 1;
1184  else if (pos_a == pos_b)
1185  return 0;
1186  return -1;
1187 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
#define DatumGetInt32(X)
Definition: postgres.h:516
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:317
void * arg

◆ ReorderBufferIterTXNFinish()

static void ReorderBufferIterTXNFinish ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1419 of file reorderbuffer.c.

References Assert, binaryheap_free(), dlist_container, dlist_is_empty(), dlist_pop_head_node(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::file, FileClose(), ReorderBufferIterTXNState::heap, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, pfree(), ReorderBufferReturnChange(), and TXNEntryFile::vfd.

Referenced by ReorderBufferProcessTXN().

1421 {
1422  int32 off;
1423 
1424  for (off = 0; off < state->nr_txns; off++)
1425  {
1426  if (state->entries[off].file.vfd != -1)
1427  FileClose(state->entries[off].file.vfd);
1428  }
1429 
1430  /* free memory we might have "leaked" in the last *Next call */
1431  if (!dlist_is_empty(&state->old_change))
1432  {
1433  ReorderBufferChange *change;
1434 
1435  change = dlist_container(ReorderBufferChange, node,
1436  dlist_pop_head_node(&state->old_change));
1437  ReorderBufferReturnChange(rb, change, true);
1438  Assert(dlist_is_empty(&state->old_change));
1439  }
1440 
1441  binaryheap_free(state->heap);
1442  pfree(state);
1443 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
signed int int32
Definition: c.h:429
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
void pfree(void *pointer)
Definition: mcxt.c:1169
void FileClose(File file)
Definition: fd.c:1959
#define Assert(condition)
Definition: c.h:804
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:69
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)

◆ ReorderBufferIterTXNInit()

static void ReorderBufferIterTXNInit ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferIterTXNState *volatile *  iter_state 
)
static

Definition at line 1199 of file reorderbuffer.c.

References AssertChangeLsnOrder(), binaryheap_add_unordered(), binaryheap_allocate(), binaryheap_build(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, ReorderBuffer::context, dlist_iter::cur, dlist_container, dlist_foreach, dlist_head_element, dlist_init(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::file, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, MemoryContextAllocZero(), ReorderBufferTXN::nentries, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, rbtxn_is_serialized, ReorderBufferIterCompare(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferIterTXNEntry::segno, ReorderBufferTXN::subtxns, ReorderBufferTXNByIdEnt::txn, ReorderBufferIterTXNEntry::txn, and TXNEntryFile::vfd.

Referenced by ReorderBufferProcessTXN().

1201 {
1202  Size nr_txns = 0;
1204  dlist_iter cur_txn_i;
1205  int32 off;
1206 
1207  *iter_state = NULL;
1208 
1209  /* Check ordering of changes in the toplevel transaction. */
1210  AssertChangeLsnOrder(txn);
1211 
1212  /*
1213  * Calculate the size of our heap: one element for every transaction that
1214  * contains changes. (Besides the transactions already in the reorder
1215  * buffer, we count the one we were directly passed.)
1216  */
1217  if (txn->nentries > 0)
1218  nr_txns++;
1219 
1220  dlist_foreach(cur_txn_i, &txn->subtxns)
1221  {
1222  ReorderBufferTXN *cur_txn;
1223 
1224  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1225 
1226  /* Check ordering of changes in this subtransaction. */
1227  AssertChangeLsnOrder(cur_txn);
1228 
1229  if (cur_txn->nentries > 0)
1230  nr_txns++;
1231  }
1232 
1233  /* allocate iteration state */
1234  state = (ReorderBufferIterTXNState *)
1236  sizeof(ReorderBufferIterTXNState) +
1237  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1238 
1239  state->nr_txns = nr_txns;
1240  dlist_init(&state->old_change);
1241 
1242  for (off = 0; off < state->nr_txns; off++)
1243  {
1244  state->entries[off].file.vfd = -1;
1245  state->entries[off].segno = 0;
1246  }
1247 
1248  /* allocate heap */
1249  state->heap = binaryheap_allocate(state->nr_txns,
1251  state);
1252 
1253  /* Now that the state fields are initialized, it is safe to return it. */
1254  *iter_state = state;
1255 
1256  /*
1257  * Now insert items into the binary heap, in an unordered fashion. (We
1258  * will run a heap assembly step at the end; this is more efficient.)
1259  */
1260 
1261  off = 0;
1262 
1263  /* add toplevel transaction if it contains changes */
1264  if (txn->nentries > 0)
1265  {
1266  ReorderBufferChange *cur_change;
1267 
1268  if (rbtxn_is_serialized(txn))
1269  {
1270  /* serialize remaining changes */
1271  ReorderBufferSerializeTXN(rb, txn);
1272  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1273  &state->entries[off].segno);
1274  }
1275 
1276  cur_change = dlist_head_element(ReorderBufferChange, node,
1277  &txn->changes);
1278 
1279  state->entries[off].lsn = cur_change->lsn;
1280  state->entries[off].change = cur_change;
1281  state->entries[off].txn = txn;
1282 
1284  }
1285 
1286  /* add subtransactions if they contain changes */
1287  dlist_foreach(cur_txn_i, &txn->subtxns)
1288  {
1289  ReorderBufferTXN *cur_txn;
1290 
1291  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1292 
1293  if (cur_txn->nentries > 0)
1294  {
1295  ReorderBufferChange *cur_change;
1296 
1297  if (rbtxn_is_serialized(cur_txn))
1298  {
1299  /* serialize remaining changes */
1300  ReorderBufferSerializeTXN(rb, cur_txn);
1301  ReorderBufferRestoreChanges(rb, cur_txn,
1302  &state->entries[off].file,
1303  &state->entries[off].segno);
1304  }
1305  cur_change = dlist_head_element(ReorderBufferChange, node,
1306  &cur_txn->changes);
1307 
1308  state->entries[off].lsn = cur_change->lsn;
1309  state->entries[off].change = cur_change;
1310  state->entries[off].txn = cur_txn;
1311 
1313  }
1314  }
1315 
1316  /* assemble a valid binary heap */
1317  binaryheap_build(state->heap);
1318 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
ReorderBufferTXN * txn
#define dlist_foreach(iter, lhead)
Definition: ilist.h:526
void binaryheap_add_unordered(binaryheap *heap, Datum d)
Definition: binaryheap.c:110
signed int int32
Definition: c.h:429
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
dlist_head changes
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:506
ReorderBufferChange * change
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
MemoryContext context
dlist_node * cur
Definition: ilist.h:161
static void AssertChangeLsnOrder(ReorderBufferTXN *txn)
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:906
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:126
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
Definition: regguts.h:317
size_t Size
Definition: c.h:540
dlist_head subtxns
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:33
#define Int32GetDatum(X)
Definition: postgres.h:523
#define rbtxn_is_serialized(txn)

◆ ReorderBufferIterTXNNext()

static ReorderBufferChange * ReorderBufferIterTXNNext ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1327 of file reorderbuffer.c.

References Assert, binaryheap::bh_size, binaryheap_first(), binaryheap_remove_first(), binaryheap_replace_first(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, DatumGetInt32, DEBUG2, dlist_container, dlist_delete(), dlist_has_next(), dlist_head_element, dlist_is_empty(), dlist_next_node(), dlist_pop_head_node(), dlist_push_tail(), elog, ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::file, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, next, ReorderBufferChange::node, ReorderBufferIterTXNState::old_change, ReorderBufferRestoreChanges(), ReorderBufferReturnChange(), ReorderBufferIterTXNEntry::segno, ReorderBufferTXN::size, ReorderBuffer::totalBytes, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferProcessTXN().

1328 {
1329  ReorderBufferChange *change;
1331  int32 off;
1332 
1333  /* nothing there anymore */
1334  if (state->heap->bh_size == 0)
1335  return NULL;
1336 
1337  off = DatumGetInt32(binaryheap_first(state->heap));
1338  entry = &state->entries[off];
1339 
1340  /* free memory we might have "leaked" in the previous *Next call */
1341  if (!dlist_is_empty(&state->old_change))
1342  {
1343  change = dlist_container(ReorderBufferChange, node,
1344  dlist_pop_head_node(&state->old_change));
1345  ReorderBufferReturnChange(rb, change, true);
1346  Assert(dlist_is_empty(&state->old_change));
1347  }
1348 
1349  change = entry->change;
1350 
1351  /*
1352  * update heap with information about which transaction has the next
1353  * relevant change in LSN order
1354  */
1355 
1356  /* there are in-memory changes */
1357  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1358  {
1359  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1360  ReorderBufferChange *next_change =
1361  dlist_container(ReorderBufferChange, node, next);
1362 
1363  /* txn stays the same */
1364  state->entries[off].lsn = next_change->lsn;
1365  state->entries[off].change = next_change;
1366 
1368  return change;
1369  }
1370 
1371  /* try to load changes from disk */
1372  if (entry->txn->nentries != entry->txn->nentries_mem)
1373  {
1374  /*
1375  * Ugly: restoring changes will reuse *Change records, thus delete the
1376  * current one from the per-tx list and only free in the next call.
1377  */
1378  dlist_delete(&change->node);
1379  dlist_push_tail(&state->old_change, &change->node);
1380 
1381  /*
1382  * Update the total bytes processed by the txn for which we are
1383  * releasing the current set of changes and restoring the new set of
1384  * changes.
1385  */
1386  rb->totalBytes += entry->txn->size;
1387  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1388  &state->entries[off].segno))
1389  {
1390  /* successfully restored changes from disk */
1391  ReorderBufferChange *next_change =
1393  &entry->txn->changes);
1394 
1395  elog(DEBUG2, "restored %u/%u changes from disk",
1396  (uint32) entry->txn->nentries_mem,
1397  (uint32) entry->txn->nentries);
1398 
1399  Assert(entry->txn->nentries_mem);
1400  /* txn stays the same */
1401  state->entries[off].lsn = next_change->lsn;
1402  state->entries[off].change = next_change;
1404 
1405  return change;
1406  }
1407  }
1408 
1409  /* ok, no changes there anymore, remove */
1410  binaryheap_remove_first(state->heap);
1411 
1412  return change;
1413 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
static int32 next
Definition: blutils.c:219
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
#define DatumGetInt32(X)
Definition: postgres.h:516
ReorderBufferTXN * txn
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void binaryheap_replace_first(binaryheap *heap, Datum d)
Definition: binaryheap.c:204
signed int int32
Definition: c.h:429
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:440
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
dlist_head changes
Datum binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:159
#define DEBUG2
Definition: elog.h:24
unsigned int uint32
Definition: c.h:441
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
int bh_size
Definition: binaryheap.h:32
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:506
ReorderBufferChange * change
static bool dlist_has_next(dlist_head *head, dlist_node *node)
Definition: ilist.h:421
#define Assert(condition)
Definition: c.h:804
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define Int32GetDatum(X)
Definition: postgres.h:523
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
#define elog(elevel,...)
Definition: elog.h:232
Datum binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:174
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)

◆ ReorderBufferLargestTopTXN()

static ReorderBufferTXN* ReorderBufferLargestTopTXN ( ReorderBuffer rb)
static

Definition at line 3412 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, dlist_iter::cur, dlist_container, dlist_foreach, rbtxn_has_partial_change, rbtxn_is_known_subxact, ReorderBufferTXN::total_size, ReorderBufferTXNByIdEnt::txn, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferCheckMemoryLimit().

3413 {
3414  dlist_iter iter;
3415  Size largest_size = 0;
3416  ReorderBufferTXN *largest = NULL;
3417 
3418  /* Find the largest top-level transaction having a base snapshot. */
3420  {
3421  ReorderBufferTXN *txn;
3422 
3423  txn = dlist_container(ReorderBufferTXN, base_snapshot_node, iter.cur);
3424 
3425  /* must not be a subtxn */
3427  /* base_snapshot must be set */
3428  Assert(txn->base_snapshot != NULL);
3429 
3430  if ((largest == NULL || txn->total_size > largest_size) &&
3431  (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)))
3432  {
3433  largest = txn;
3434  largest_size = txn->total_size;
3435  }
3436  }
3437 
3438  return largest;
3439 }
Snapshot base_snapshot
#define dlist_foreach(iter, lhead)
Definition: ilist.h:526
#define rbtxn_has_partial_change(txn)
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
dlist_head txns_by_base_snapshot_lsn
dlist_node * cur
Definition: ilist.h:161
#define Assert(condition)
Definition: c.h:804
size_t Size
Definition: c.h:540
#define rbtxn_is_known_subxact(txn)

◆ ReorderBufferLargestTXN()

static ReorderBufferTXN* ReorderBufferLargestTXN ( ReorderBuffer rb)
static

Definition at line 3365 of file reorderbuffer.c.

References Assert, ReorderBuffer::by_txn, hash_seq_init(), hash_seq_search(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferCheckMemoryLimit().

3366 {
3367  HASH_SEQ_STATUS hash_seq;
3369  ReorderBufferTXN *largest = NULL;
3370 
3371  hash_seq_init(&hash_seq, rb->by_txn);
3372  while ((ent = hash_seq_search(&hash_seq)) != NULL)
3373  {
3374  ReorderBufferTXN *txn = ent->txn;
3375 
3376  /* if the current transaction is larger, remember it */
3377  if ((!largest) || (txn->size > largest->size))
3378  largest = txn;
3379  }
3380 
3381  Assert(largest);
3382  Assert(largest->size > 0);
3383  Assert(largest->size <= rb->size);
3384 
3385  return largest;
3386 }
#define Assert(condition)
Definition: c.h:804
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
ReorderBufferTXN * txn

◆ ReorderBufferPrepare()

void ReorderBufferPrepare ( ReorderBuffer rb,
TransactionId  xid,
char *  gid 
)

Definition at line 2701 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::concurrent_abort, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBuffer::prepare, ReorderBufferTXN::prepare_time, pstrdup(), rbtxn_is_streamed, RBTXN_PREPARE, ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBufferTXNByIdEnt::txn, ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodePrepare().

2703 {
2704  ReorderBufferTXN *txn;
2705 
2706  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2707  false);
2708 
2709  /* unknown transaction, nothing to replay */
2710  if (txn == NULL)
2711  return;
2712 
2713  txn->txn_flags |= RBTXN_PREPARE;
2714  txn->gid = pstrdup(gid);
2715 
2716  /* The prepare info must have been updated in txn by now. */
2718 
2719  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2720  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2721 
2722  /*
2723  * We send the prepare for the concurrently aborted xacts so that later
2724  * when rollback prepared is decoded and sent, the downstream should be
2725  * able to rollback such a xact. See comments atop DecodePrepare.
2726  *
2727  * Note, for the concurrent_abort + streaming case a stream_prepare was
2728  * already sent within the ReorderBufferReplay call above.
2729  */
2730  if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
2731  rb->prepare(rb, txn, txn->final_lsn);
2732 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
RepOriginId origin_id
char * pstrdup(const char *in)
Definition: mcxt.c:1299
XLogRecPtr origin_lsn
#define rbtxn_is_streamed(txn)
XLogRecPtr final_lsn
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
union ReorderBufferTXN::@103 xact_time
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn
#define RBTXN_PREPARE
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
TimestampTz prepare_time
ReorderBufferPrepareCB prepare

◆ ReorderBufferProcessPartialChange()

static void ReorderBufferProcessPartialChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  toast_insert 
)
static

Definition at line 695 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, IsInsertOrUpdate, IsSpecConfirmOrAbort, IsSpecInsert, RBTXN_HAS_PARTIAL_CHANGE, rbtxn_has_partial_change, rbtxn_is_serialized, ReorderBufferCanStartStreaming(), ReorderBufferCanStream(), ReorderBufferStreamTXN(), ReorderBufferTXN::toptxn, ReorderBufferChange::tp, ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::txn_flags.

Referenced by ReorderBufferQueueChange().

698 {
699  ReorderBufferTXN *toptxn;
700 
701  /*
702  * The partial changes need to be processed only while streaming
703  * in-progress transactions.
704  */
705  if (!ReorderBufferCanStream(rb))
706  return;
707 
708  /* Get the top transaction. */
709  if (txn->toptxn != NULL)
710  toptxn = txn->toptxn;
711  else
712  toptxn = txn;
713 
714  /*
715  * Indicate a partial change for toast inserts. The change will be
716  * considered as complete once we get the insert or update on the main
717  * table and we are sure that the pending toast chunks are not required
718  * anymore.
719  *
720  * If we allow streaming when there are pending toast chunks then such
721  * chunks won't be released till the insert (multi_insert) is complete and
722  * we expect the txn to have streamed all changes after streaming. This
723  * restriction is mainly to ensure the correctness of streamed
724  * transactions and it doesn't seem worth uplifting such a restriction
725  * just to allow this case because anyway we will stream the transaction
726  * once such an insert is complete.
727  */
728  if (toast_insert)
730  else if (rbtxn_has_partial_change(toptxn) &&
731  IsInsertOrUpdate(change->action) &&
732  change->data.tp.clear_toast_afterwards)
734 
735  /*
736  * Indicate a partial change for speculative inserts. The change will be
737  * considered as complete once we get the speculative confirm or abort
738  * token.
739  */
740  if (IsSpecInsert(change->action))
742  else if (rbtxn_has_partial_change(toptxn) &&
743  IsSpecConfirmOrAbort(change->action))
745 
746  /*
747  * Stream the transaction if it is serialized before and the changes are
748  * now complete in the top-level transaction.
749  *
750  * The reason for doing the streaming of such a transaction as soon as we
751  * get the complete change for it is that previously it would have reached
752  * the memory threshold and wouldn't get streamed because of incomplete
753  * changes. Delaying such transactions would increase apply lag for them.
754  */
756  !(rbtxn_has_partial_change(toptxn)) &&
757  rbtxn_is_serialized(txn))
758  ReorderBufferStreamTXN(rb, toptxn);
759 }
#define rbtxn_has_partial_change(txn)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:85
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define IsSpecInsert(action)
static bool ReorderBufferCanStream(ReorderBuffer *rb)
struct ReorderBufferTXN * toptxn
union ReorderBufferChange::@97 data
#define RBTXN_HAS_PARTIAL_CHANGE
#define IsInsertOrUpdate(action)
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)
#define IsSpecConfirmOrAbort(action)
struct ReorderBufferChange::@97::@98 tp
#define rbtxn_is_serialized(txn)

◆ ReorderBufferProcessTXN()

static void ReorderBufferProcessTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn,
volatile Snapshot  snapshot_now,
volatile CommandId  command_id,
bool  streaming 
)
static

Definition at line 2022 of file reorderbuffer.c.

References AbortCurrentTransaction(), ReorderBufferChange::action, Assert, ReorderBuffer::begin, ReorderBuffer::begin_prepare, BeginInternalSubTransaction(), CheckXidAlive, ReorderBufferChange::command_id, ReorderBuffer::commit, ReorderBufferTXN::concurrent_abort, SnapshotData::copied, CopyErrorData(), SnapshotData::curcid, CurrentMemoryContext, ReorderBufferChange::data, dlist_delete(), elog, ERROR, FlushErrorState(), FreeErrorData(), GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), i, ReorderBufferChange::inval, ReorderBufferTXN::invalidations, InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, ReorderBufferChange::node, ReorderBufferChange::origin_id, ReorderBufferTXN::origin_id, ReorderBuffer::output_rewrites, palloc0(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, ReorderBuffer::prepare, rbtxn_is_streamed, rbtxn_prepared, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelationIsValid, RelidByRelfilenode(), relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferApplyChange(), ReorderBufferApplyMessage(), ReorderBufferApplyTruncate(), ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferResetTXN(), ReorderBufferReturnChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), RollbackAndReleaseCurrentSubTransaction(), SetupCheckXidLive(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, ErrorData::sqlerrcode, StartTransactionCommand(), ReorderBuffer::stream_start, ReorderBuffer::stream_stop, TeardownHistoricSnapshot(), ReorderBufferTXN::total_size, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTXN::tuplecid_hash, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferReplay(), and ReorderBufferStreamTXN().

2027 {
2028  bool using_subtxn;
2030  ReorderBufferIterTXNState *volatile iterstate = NULL;
2031  volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
2032  ReorderBufferChange *volatile specinsert = NULL;
2033  volatile bool stream_started = false;
2034  ReorderBufferTXN *volatile curtxn = NULL;
2035 
2036  /* build data to be able to lookup the CommandIds of catalog tuples */
2038 
2039  /* setup the initial snapshot */
2040  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2041 
2042  /*
2043  * Decoding needs access to syscaches et al., which in turn use
2044  * heavyweight locks and such. Thus we need to have enough state around to
2045  * keep track of those. The easiest way is to simply use a transaction
2046  * internally. That also allows us to easily enforce that nothing writes
2047  * to the database by checking for xid assignments.
2048  *
2049  * When we're called via the SQL SRF there's already a transaction
2050  * started, so start an explicit subtransaction there.
2051  */
2052  using_subtxn = IsTransactionOrTransactionBlock();
2053 
2054  PG_TRY();
2055  {
2056  ReorderBufferChange *change;
2057 
2058  if (using_subtxn)
2059  BeginInternalSubTransaction(streaming ? "stream" : "replay");
2060  else
2062 
2063  /*
2064  * We only need to send begin/begin-prepare for non-streamed
2065  * transactions.
2066  */
2067  if (!streaming)
2068  {
2069  if (rbtxn_prepared(txn))
2070  rb->begin_prepare(rb, txn);
2071  else
2072  rb->begin(rb, txn);
2073  }
2074 
2075  ReorderBufferIterTXNInit(rb, txn, &iterstate);
2076  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
2077  {
2078  Relation relation = NULL;
2079  Oid reloid;
2080 
2081  /*
2082  * We can't call start stream callback before processing first
2083  * change.
2084  */
2085  if (prev_lsn == InvalidXLogRecPtr)
2086  {
2087  if (streaming)
2088  {
2089  txn->origin_id = change->origin_id;
2090  rb->stream_start(rb, txn, change->lsn);
2091  stream_started = true;
2092  }
2093  }
2094 
2095  /*
2096  * Enforce correct ordering of changes, merged from multiple
2097  * subtransactions. The changes may have the same LSN due to
2098  * MULTI_INSERT xlog records.
2099  */
2100  Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
2101 
2102  prev_lsn = change->lsn;
2103 
2104  /*
2105  * Set the current xid to detect concurrent aborts. This is
2106  * required for the cases when we decode the changes before the
2107  * COMMIT record is processed.
2108  */
2109  if (streaming || rbtxn_prepared(change->txn))
2110  {
2111  curtxn = change->txn;
2112  SetupCheckXidLive(curtxn->xid);
2113  }
2114 
2115  switch (change->action)
2116  {
2118 
2119  /*
2120  * Confirmation for speculative insertion arrived. Simply
2121  * use as a normal record. It'll be cleaned up at the end
2122  * of INSERT processing.
2123  */
2124  if (specinsert == NULL)
2125  elog(ERROR, "invalid ordering of speculative insertion changes");
2126  Assert(specinsert->data.tp.oldtuple == NULL);
2127  change = specinsert;
2129 
2130  /* intentionally fall through */
2134  Assert(snapshot_now);
2135 
2136  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
2137  change->data.tp.relnode.relNode);
2138 
2139  /*
2140  * Mapped catalog tuple without data, emitted while
2141  * catalog table was in the process of being rewritten. We
2142  * can fail to look up the relfilenode, because the
2143  * relmapper has no "historic" view, in contrast to the
2144  * normal catalog during decoding. Thus repeated rewrites
2145  * can cause a lookup failure. That's OK because we do not
2146  * decode catalog changes anyway. Normally such tuples
2147  * would be skipped over below, but we can't identify
2148  * whether the table should be logically logged without
2149  * mapping the relfilenode to the oid.
2150  */
2151  if (reloid == InvalidOid &&
2152  change->data.tp.newtuple == NULL &&
2153  change->data.tp.oldtuple == NULL)
2154  goto change_done;
2155  else if (reloid == InvalidOid)
2156  elog(ERROR, "could not map filenode \"%s\" to relation OID",
2157  relpathperm(change->data.tp.relnode,
2158  MAIN_FORKNUM));
2159 
2160  relation = RelationIdGetRelation(reloid);
2161 
2162  if (!RelationIsValid(relation))
2163  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
2164  reloid,
2165  relpathperm(change->data.tp.relnode,
2166  MAIN_FORKNUM));
2167 
2168  if (!RelationIsLogicallyLogged(relation))
2169  goto change_done;
2170 
2171  /*
2172  * Ignore temporary heaps created during DDL unless the
2173  * plugin has asked for them.
2174  */
2175  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2176  goto change_done;
2177 
2178  /*
2179  * For now ignore sequence changes entirely. Most of the
2180  * time they don't log changes using records we
2181  * understand, so it doesn't make sense to handle the few
2182  * cases we do.
2183  */
2184  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2185  goto change_done;
2186 
2187  /* user-triggered change */
2188  if (!IsToastRelation(relation))
2189  {
2190  ReorderBufferToastReplace(rb, txn, relation, change);
2191  ReorderBufferApplyChange(rb, txn, relation, change,
2192  streaming);
2193 
2194  /*
2195  * Only clear reassembled toast chunks if we're sure
2196  * they're not required anymore. The creator of the
2197  * tuple tells us.
2198  */
2199  if (change->data.tp.clear_toast_afterwards)
2200  ReorderBufferToastReset(rb, txn);
2201  }
2202  /* we're not interested in toast deletions */
2203  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2204  {
2205  /*
2206  * Need to reassemble the full toasted Datum in
2207  * memory, to ensure the chunks don't get reused till
2208  * we're done remove it from the list of this
2209  * transaction's changes. Otherwise it will get
2210  * freed/reused while restoring spooled data from
2211  * disk.
2212  */
2213  Assert(change->data.tp.newtuple != NULL);
2214 
2215  dlist_delete(&change->node);
2216  ReorderBufferToastAppendChunk(rb, txn, relation,
2217  change);
2218  }
2219 
2220  change_done:
2221 
2222  /*
2223  * If speculative insertion was confirmed, the record
2224  * isn't needed anymore.
2225  */
2226  if (specinsert != NULL)
2227  {
2228  ReorderBufferReturnChange(rb, specinsert, true);
2229  specinsert = NULL;
2230  }
2231 
2232  if (RelationIsValid(relation))
2233  {
2234  RelationClose(relation);
2235  relation = NULL;
2236  }
2237  break;
2238 
2240 
2241  /*
2242  * Speculative insertions are dealt with by delaying the
2243  * processing of the insert until the confirmation record
2244  * arrives. For that we simply unlink the record from the
2245  * chain, so it does not get freed/reused while restoring
2246  * spooled data from disk.
2247  *
2248  * This is safe in the face of concurrent catalog changes
2249  * because the relevant relation can't be changed between
2250  * speculative insertion and confirmation due to
2251  * CheckTableNotInUse() and locking.
2252  */
2253 
2254  /* clear out a pending (and thus failed) speculation */
2255  if (specinsert != NULL)
2256  {
2257  ReorderBufferReturnChange(rb, specinsert, true);
2258  specinsert = NULL;
2259  }
2260 
2261  /* and memorize the pending insertion */
2262  dlist_delete(&change->node);
2263  specinsert = change;
2264  break;
2265 
2267 
2268  /*
2269  * Abort for speculative insertion arrived. So cleanup the
2270  * specinsert tuple and toast hash.
2271  *
2272  * Note that we get the spec abort change for each toast
2273  * entry but we need to perform the cleanup only the first
2274  * time we get it for the main table.
2275  */
2276  if (specinsert != NULL)
2277  {
2278  /*
2279  * We must clean the toast hash before processing a
2280  * completely new tuple to avoid confusion about the
2281  * previous tuple's toast chunks.
2282  */
2283  Assert(change->data.tp.clear_toast_afterwards);
2284  ReorderBufferToastReset(rb, txn);
2285 
2286  /* We don't need this record anymore. */
2287  ReorderBufferReturnChange(rb, specinsert, true);
2288  specinsert = NULL;
2289  }
2290  break;
2291 
2293  {
2294  int i;
2295  int nrelids = change->data.truncate.nrelids;
2296  int nrelations = 0;
2297  Relation *relations;
2298 
2299  relations = palloc0(nrelids * sizeof(Relation));
2300  for (i = 0; i < nrelids; i++)
2301  {
2302  Oid relid = change->data.truncate.relids[i];
2303  Relation relation;
2304 
2305  relation = RelationIdGetRelation(relid);
2306 
2307  if (!RelationIsValid(relation))
2308  elog(ERROR, "could not open relation with OID %u", relid);
2309 
2310  if (!RelationIsLogicallyLogged(relation))
2311  continue;
2312 
2313  relations[nrelations++] = relation;
2314  }
2315 
2316  /* Apply the truncate. */
2317  ReorderBufferApplyTruncate(rb, txn, nrelations,
2318  relations, change,
2319  streaming);
2320 
2321  for (i = 0; i < nrelations; i++)
2322  RelationClose(relations[i]);
2323 
2324  break;
2325  }
2326 
2328  ReorderBufferApplyMessage(rb, txn, change, streaming);
2329  break;
2330 
2332  /* Execute the invalidation messages locally */
2334  change->data.inval.ninvalidations,
2335  change->data.inval.invalidations);
2336  break;
2337 
2339  /* get rid of the old */
2340  TeardownHistoricSnapshot(false);
2341 
2342  if (snapshot_now->copied)
2343  {
2344  ReorderBufferFreeSnap(rb, snapshot_now);
2345  snapshot_now =
2346  ReorderBufferCopySnap(rb, change->data.snapshot,
2347  txn, command_id);
2348  }
2349 
2350  /*
2351  * Restored from disk, need to be careful not to double
2352  * free. We could introduce refcounting for that, but for
2353  * now this seems infrequent enough not to care.
2354  */
2355  else if (change->data.snapshot->copied)
2356  {
2357  snapshot_now =
2358  ReorderBufferCopySnap(rb, change->data.snapshot,
2359  txn, command_id);
2360  }
2361  else
2362  {
2363  snapshot_now = change->data.snapshot;
2364  }
2365 
2366  /* and continue with the new one */
2367  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2368  break;
2369 
2371  Assert(change->data.command_id != InvalidCommandId);
2372 
2373  if (command_id < change->data.command_id)
2374  {
2375  command_id = change->data.command_id;
2376 
2377  if (!snapshot_now->copied)
2378  {
2379  /* we don't use the global one anymore */
2380  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2381  txn, command_id);
2382  }
2383 
2384  snapshot_now->curcid = command_id;
2385 
2386  TeardownHistoricSnapshot(false);
2387  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2388  }
2389 
2390  break;
2391 
2393  elog(ERROR, "tuplecid value in changequeue");
2394  break;
2395  }
2396  }
2397 
2398  /* speculative insertion record must be freed by now */
2399  Assert(!specinsert);
2400 
2401  /* clean up the iterator */
2402  ReorderBufferIterTXNFinish(rb, iterstate);
2403  iterstate = NULL;
2404 
2405  /*
2406  * Update total transaction count and total bytes processed by the
2407  * transaction and its subtransactions. Ensure to not count the
2408  * streamed transaction multiple times.
2409  *
2410  * Note that the statistics computation has to be done after
2411  * ReorderBufferIterTXNFinish as it releases the serialized change
2412  * which we have already accounted in ReorderBufferIterTXNNext.
2413  */
2414  if (!rbtxn_is_streamed(txn))
2415  rb->totalTxns++;
2416 
2417  rb->totalBytes += txn->total_size;
2418 
2419  /*
2420  * Done with current changes, send the last message for this set of
2421  * changes depending upon streaming mode.
2422  */
2423  if (streaming)
2424  {
2425  if (stream_started)
2426  {
2427  rb->stream_stop(rb, txn, prev_lsn);
2428  stream_started = false;
2429  }
2430  }
2431  else
2432  {
2433  /*
2434  * Call either PREPARE (for two-phase transactions) or COMMIT (for
2435  * regular ones).
2436  */
2437  if (rbtxn_prepared(txn))
2438  rb->prepare(rb, txn, commit_lsn);
2439  else
2440  rb->commit(rb, txn, commit_lsn);
2441  }
2442 
2443  /* this is just a sanity check against bad output plugin behaviour */
2445  elog(ERROR, "output plugin used XID %u",
2447 
2448  /*
2449  * Remember the command ID and snapshot for the next set of changes in
2450  * streaming mode.
2451  */
2452  if (streaming)
2453  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2454  else if (snapshot_now->copied)
2455  ReorderBufferFreeSnap(rb, snapshot_now);
2456 
2457  /* cleanup */
2458  TeardownHistoricSnapshot(false);
2459 
2460  /*
2461  * Aborting the current (sub-)transaction as a whole has the right
2462  * semantics. We want all locks acquired in here to be released, not
2463  * reassigned to the parent and we do not want any database access
2464  * have persistent effects.
2465  */
2467 
2468  /* make sure there's no cache pollution */
2470 
2471  if (using_subtxn)
2473 
2474  /*
2475  * We are here due to one of the four reasons: 1. Decoding an
2476  * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
2477  * prepared txn that was (partially) streamed. 4. Decoding a committed
2478  * txn.
2479  *
2480  * For 1, we allow truncation of txn data by removing the changes
2481  * already streamed but still keeping other things like invalidations,
2482  * snapshot, and tuplecids. For 2 and 3, we indicate
2483  * ReorderBufferTruncateTXN to do more elaborate truncation of txn
2484  * data as the entire transaction has been decoded except for commit.
2485  * For 4, as the entire txn has been decoded, we can fully clean up
2486  * the TXN reorder buffer.
2487  */
2488  if (streaming || rbtxn_prepared(txn))
2489  {
2491  /* Reset the CheckXidAlive */
2493  }
2494  else
2495  ReorderBufferCleanupTXN(rb, txn);
2496  }
2497  PG_CATCH();
2498  {
2499  MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
2500  ErrorData *errdata = CopyErrorData();
2501 
2502  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
2503  if (iterstate)
2504  ReorderBufferIterTXNFinish(rb, iterstate);
2505 
2507 
2508  /*
2509  * Force cache invalidation to happen outside of a valid transaction
2510  * to prevent catalog access as we just caught an error.
2511  */
2513 
2514  /* make sure there's no cache pollution */
2516  txn->invalidations);
2517 
2518  if (using_subtxn)
2520 
2521  /*
2522  * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2523  * abort of the (sub)transaction we are streaming or preparing. We
2524  * need to do the cleanup and return gracefully on this error, see
2525  * SetupCheckXidLive.
2526  *
2527  * This error code can be thrown by one of the callbacks we call
2528  * during decoding so we need to ensure that we return gracefully only
2529  * when we are sending the data in streaming mode and the streaming is
2530  * not finished yet or when we are sending the data out on a PREPARE
2531  * during a two-phase commit.
2532  */
2533  if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2534  (stream_started || rbtxn_prepared(txn)))
2535  {
2536  /* curtxn must be set for streaming or prepared transactions */
2537  Assert(curtxn);
2538 
2539  /* Cleanup the temporary error state. */
2540  FlushErrorState();
2541  FreeErrorData(errdata);
2542  errdata = NULL;
2543  curtxn->concurrent_abort = true;
2544 
2545  /* Reset the TXN so that it is allowed to stream remaining data. */
2546  ReorderBufferResetTXN(rb, txn, snapshot_now,
2547  command_id, prev_lsn,
2548  specinsert);
2549  }
2550  else
2551  {
2552  ReorderBufferCleanupTXN(rb, txn);
2553  MemoryContextSwitchTo(ecxt);
2554  PG_RE_THROW();
2555  }
2556  }
2557  PG_END_TRY();
2558 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
void AbortCurrentTransaction(void)
Definition: xact.c:3224
bool IsToastRelation(Relation relation)
Definition: catalog.c:146
#define relpathperm(rnode, forknum)
Definition: relpath.h:83
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define rbtxn_prepared(txn)
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
bool copied
Definition: snapshot.h:185
int sqlerrcode
Definition: elog.h:381
ErrorData * CopyErrorData(void)
Definition: elog.c:1560
static void ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:88
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4715
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2066
ReorderBufferCommitCB commit
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:674
Form_pg_class rd_rel
Definition: rel.h:109
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:85
void FlushErrorState(void)
Definition: elog.c:1654
#define rbtxn_is_streamed(txn)
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1616
#define ERROR
Definition: elog.h:46
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
#define RelationIsValid(relation)
Definition: rel.h:450
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:439
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4526
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:456
#define InvalidTransactionId
Definition: transam.h:31
ReorderBufferBeginCB begin_prepare
void RelationClose(Relation relation)
Definition: relcache.c:2101
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
struct ReorderBufferChange::@97::@102 inval
union ReorderBufferChange::@97 data
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
RepOriginId origin_id
Definition: reorderbuffer.h:90
TransactionId CheckXidAlive
Definition: xact.c:96
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
static void ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
void * palloc0(Size size)
Definition: mcxt.c:1093
static void ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
#define InvalidCommandId
Definition: c.h:604
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
#define InvalidOid
Definition: postgres_ext.h:36
ReorderBufferStreamStartCB stream_start
#define PG_CATCH()
Definition: elog.h:323
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
void StartTransactionCommand(void)
Definition: xact.c:2852
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4421
SharedInvalidationMessage * invalidations
#define PG_RE_THROW()
Definition: elog.h:354
struct ReorderBufferChange::@97::@98 tp
struct ReorderBufferChange::@97::@99 truncate
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2050
#define elog(elevel,...)
Definition: elog.h:232
int i
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
static void SetupCheckXidLive(TransactionId xid)
ReorderBufferBeginCB begin
#define PG_TRY()
Definition: elog.h:313
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:1995
#define PG_END_TRY()
Definition: elog.h:338
static void ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
ReorderBufferPrepareCB prepare
ReorderBufferStreamStopCB stream_stop

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3017 of file reorderbuffer.c.

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by DecodeHeap2Op(), DecodeHeapOp(), DecodeLogicalMsgOp(), DecodeStandbyOp(), DecodeXactOp(), DecodeXLogOp(), and LogicalDecodingProcessRecord().

3018 {
3019  /* many records won't have an xid assigned, centralize check here */
3020  if (xid != InvalidTransactionId)
3021  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3022 }
#define InvalidTransactionId
Definition: transam.h:31
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change,
bool  toast_insert 
)

Definition at line 766 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, ReorderBufferTXN::concurrent_abort, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferCheckMemoryLimit(), ReorderBufferProcessPartialChange(), ReorderBufferReturnChange(), ReorderBufferTXNByXid(), ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddInvalidations(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

768 {
769  ReorderBufferTXN *txn;
770 
771  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
772 
773  /*
774  * While streaming the previous changes we have detected that the
775  * transaction is aborted. So there is no point in collecting further
776  * changes for it.
777  */
778  if (txn->concurrent_abort)
779  {
780  /*
781  * We don't need to update memory accounting for this change as we
782  * have not added it to the queue yet.
783  */
784  ReorderBufferReturnChange(rb, change, false);
785  return;
786  }
787 
788  change->lsn = lsn;
789  change->txn = txn;
790 
791  Assert(InvalidXLogRecPtr != lsn);
792  dlist_push_tail(&txn->changes, &change->node);
793  txn->nentries++;
794  txn->nentries_mem++;
795 
796  /* update memory accounting information */
797  ReorderBufferChangeMemoryUpdate(rb, change, true,
798  ReorderBufferChangeSize(change));
799 
800  /* process partial change */
801  ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
802 
803  /* check the memory limits and evict something if needed */
805 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:88
dlist_head changes
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition, Size sz)
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
#define Assert(condition)
Definition: c.h:804
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snapshot,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 812 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), TeardownHistoricSnapshot(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeLogicalMsgOp().

816 {
817  if (transactional)
818  {
819  MemoryContext oldcontext;
820  ReorderBufferChange *change;
821 
823 
824  oldcontext = MemoryContextSwitchTo(rb->context);
825 
826  change = ReorderBufferGetChange(rb);
828  change->data.msg.prefix = pstrdup(prefix);
829  change->data.msg.message_size = message_size;
830  change->data.msg.message = palloc(message_size);
831  memcpy(change->data.msg.message, message, message_size);
832 
833  ReorderBufferQueueChange(rb, xid, lsn, change, false);
834 
835  MemoryContextSwitchTo(oldcontext);
836  }
837  else
838  {
839  ReorderBufferTXN *txn = NULL;
840  volatile Snapshot snapshot_now = snapshot;
841 
842  if (xid != InvalidTransactionId)
843  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
844 
845  /* setup snapshot to allow catalog access */
846  SetupHistoricSnapshot(snapshot_now, NULL);
847  PG_TRY();
848  {
849  rb->message(rb, txn, lsn, false, prefix, message_size, message);
850 
852  }
853  PG_CATCH();
854  {
856  PG_RE_THROW();
857  }
858  PG_END_TRY();
859  }
860 }
char * pstrdup(const char *in)
Definition: mcxt.c:1299
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2066
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:85
#define InvalidTransactionId
Definition: transam.h:31
ReorderBufferMessageCB message
union ReorderBufferChange::@97 data
MemoryContext context
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
#define PG_CATCH()
Definition: elog.h:323
#define Assert(condition)
Definition: c.h:804
struct ReorderBufferChange::@97::@100 msg
#define PG_RE_THROW()
Definition: elog.h:354
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:1062
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2050
#define PG_TRY()
Definition: elog.h:313
#define PG_END_TRY()
Definition: elog.h:338

◆ ReorderBufferRememberPrepareInfo()

bool ReorderBufferRememberPrepareInfo ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  prepare_lsn,
XLogRecPtr  end_lsn,
TimestampTz  prepare_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2654 of file reorderbuffer.c.

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, ReorderBufferTXNByXid(), ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::xact_time.

Referenced by DecodePrepare().

2658 {
2659  ReorderBufferTXN *txn;
2660 
2661  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2662 
2663  /* unknown transaction, nothing to do */
2664  if (txn == NULL)
2665  return false;
2666 
2667  /*
2668  * Remember the prepare information to be later used by commit prepared in
2669  * case we skip doing prepare.
2670  */
2671  txn->final_lsn = prepare_lsn;
2672  txn->end_lsn = end_lsn;
2673  txn->xact_time.prepare_time = prepare_time;
2674  txn->origin_id = origin_id;
2675  txn->origin_lsn = origin_lsn;
2676 
2677  return true;
2678 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
RepOriginId origin_id
XLogRecPtr origin_lsn
XLogRecPtr final_lsn
union ReorderBufferTXN::@103 xact_time
XLogRecPtr end_lsn
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
TimestampTz prepare_time

◆ ReorderBufferReplay()

static void ReorderBufferReplay ( ReorderBufferTXN txn,
ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)
static

Definition at line 2571 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, FirstCommandId, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, rbtxn_is_streamed, rbtxn_prepared, ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), ReorderBufferStreamCommit(), and ReorderBufferTXN::xact_time.

Referenced by ReorderBufferCommit(), ReorderBufferFinishPrepared(), and ReorderBufferPrepare().

2576 {
2577  Snapshot snapshot_now;
2578  CommandId command_id = FirstCommandId;
2579 
2580  txn->final_lsn = commit_lsn;
2581  txn->end_lsn = end_lsn;
2582  txn->xact_time.commit_time = commit_time;
2583  txn->origin_id = origin_id;
2584  txn->origin_lsn = origin_lsn;
2585 
2586  /*
2587  * If the transaction was (partially) streamed, we need to commit it in a
2588  * 'streamed' way. That is, we first stream the remaining part of the
2589  * transaction, and then invoke stream_commit message.
2590  *
2591  * Called after everything (origin ID, LSN, ...) is stored in the
2592  * transaction to avoid passing that information directly.
2593  */
2594  if (rbtxn_is_streamed(txn))
2595  {
2596  ReorderBufferStreamCommit(rb, txn);
2597  return;
2598  }
2599 
2600  /*
2601  * If this transaction has no snapshot, it didn't make any changes to the
2602  * database, so there's nothing to decode. Note that
2603  * ReorderBufferCommitChild will have transferred any snapshots from
2604  * subtransactions if there were any.
2605  */
2606  if (txn->base_snapshot == NULL)
2607  {
2608  Assert(txn->ninvalidations == 0);
2609 
2610  /*
2611  * Removing this txn before a commit might result in the computation
2612  * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
2613  */
2614  if (!rbtxn_prepared(txn))
2615  ReorderBufferCleanupTXN(rb, txn);
2616  return;
2617  }
2618 
2619  snapshot_now = txn->base_snapshot;
2620 
2621  /* Process and send the changes to output plugin. */
2622  ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2623  command_id, false);
2624 }
uint32 CommandId
Definition: c.h:601
TimestampTz commit_time
Snapshot base_snapshot
#define rbtxn_prepared(txn)
RepOriginId origin_id
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
XLogRecPtr origin_lsn
#define FirstCommandId
Definition: c.h:603
#define rbtxn_is_streamed(txn)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
union ReorderBufferTXN::@103 xact_time
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn
static void ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)

◆ ReorderBufferResetTXN()

static void ReorderBufferResetTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id,
XLogRecPtr  last_lsn,
ReorderBufferChange specinsert 
)
static

Definition at line 1979 of file reorderbuffer.c.

References rbtxn_is_streamed, rbtxn_prepared, ReorderBufferReturnChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), and ReorderBuffer::stream_stop.

Referenced by ReorderBufferProcessTXN().

1984 {
1985  /* Discard the changes that we just streamed */
1987 
1988  /* Free all resources allocated for toast reconstruction */
1989  ReorderBufferToastReset(rb, txn);
1990 
1991  /* Return the spec insert change if it is not NULL */
1992  if (specinsert != NULL)
1993  {
1994  ReorderBufferReturnChange(rb, specinsert, true);
1995  specinsert = NULL;
1996  }
1997 
1998  /*
1999  * For the streaming case, stop the stream and remember the command ID and
2000  * snapshot for the streaming run.
2001  */
2002  if (rbtxn_is_streamed(txn))
2003  {
2004  rb->stream_stop(rb, txn, last_lsn);
2005  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2006  }
2007 }
#define rbtxn_prepared(txn)
#define rbtxn_is_streamed(txn)
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
ReorderBufferStreamStopCB stream_stop

◆ ReorderBufferRestoreChange()

static void ReorderBufferRestoreChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
char *  change 
)
static

Definition at line 4202 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, ReorderBufferTXN::changes, ReorderBuffer::context, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::inval, MemoryContextAlloc(), MemoryContextAllocZero(), ReorderBufferChange::msg, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, offsetof, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferGetChange(), ReorderBufferGetRelids(), ReorderBufferGetTupleBuf(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, ReorderBufferChange::snapshot, SnapshotData::subxcnt, ReorderBufferChange::tp, ReorderBufferChange::truncate, SnapshotData::xcnt, and SnapshotData::xip.

Referenced by ReorderBufferRestoreChanges().

4204 {
4205  ReorderBufferDiskChange *ondisk;
4206  ReorderBufferChange *change;
4207 
4208  ondisk = (ReorderBufferDiskChange *) data;
4209 
4210  change = ReorderBufferGetChange(rb);
4211 
4212  /* copy static part */
4213  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
4214 
4215  data += sizeof(ReorderBufferDiskChange);
4216 
4217  /* restore individual stuff */
4218  switch (change->action)
4219  {
4220  /* fall through these, they're all similar enough */
4225  if (change->data.tp.oldtuple)
4226  {
4227  uint32 tuplelen = ((HeapTuple) data)->t_len;
4228 
4229  change->data.tp.oldtuple =
4231 
4232  /* restore ->tuple */
4233  memcpy(&change->data.tp.oldtuple->tuple, data,
4234  sizeof(HeapTupleData));
4235  data += sizeof(HeapTupleData);
4236 
4237  /* reset t_data pointer into the new tuplebuf */
4238  change->data.tp.oldtuple->tuple.t_data =
4239  ReorderBufferTupleBufData(change->data.tp.oldtuple);
4240 
4241  /* restore tuple data itself */
4242  memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
4243  data += tuplelen;
4244  }
4245 
4246  if (change->data.tp.newtuple)
4247  {
4248  /* here, data might not be suitably aligned! */
4249  uint32 tuplelen;
4250 
4251  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
4252  sizeof(uint32));
4253 
4254  change->data.tp.newtuple =
4256 
4257  /* restore ->tuple */
4258  memcpy(&change->data.tp.newtuple->tuple, data,
4259  sizeof(HeapTupleData));
4260  data += sizeof(HeapTupleData);
4261 
4262  /* reset t_data pointer into the new tuplebuf */
4263  change->data.tp.newtuple->tuple.t_data =
4264  ReorderBufferTupleBufData(change->data.tp.newtuple);
4265 
4266  /* restore tuple data itself */
4267  memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
4268  data += tuplelen;
4269  }
4270 
4271  break;
4273  {
4274  Size prefix_size;
4275 
4276  /* read prefix */
4277  memcpy(&prefix_size, data, sizeof(Size));
4278  data += sizeof(Size);
4279  change->data.msg.prefix = MemoryContextAlloc(rb->context,
4280  prefix_size);
4281  memcpy(change->data.msg.prefix, data, prefix_size);
4282  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
4283  data += prefix_size;
4284 
4285  /* read the message */
4286  memcpy(&change->data.msg.message_size, data, sizeof(Size));
4287  data += sizeof(Size);
4288  change->data.msg.message = MemoryContextAlloc(rb->context,
4289  change->data.msg.message_size);
4290  memcpy(change->data.msg.message, data,
4291  change->data.msg.message_size);
4292  data += change->data.msg.message_size;
4293 
4294  break;
4295  }
4297  {
4298  Size inval_size = sizeof(SharedInvalidationMessage) *
4299  change->data.inval.ninvalidations;
4300 
4301  change->data.inval.invalidations =
4302  MemoryContextAlloc(rb->context, inval_size);
4303 
4304  /* read the message */
4305  memcpy(change->data.inval.invalidations, data, inval_size);
4306 
4307  break;
4308  }
4310  {
4311  Snapshot oldsnap;
4312  Snapshot newsnap;
4313  Size size;
4314 
4315  oldsnap = (Snapshot) data;
4316 
4317  size = sizeof(SnapshotData) +
4318  sizeof(TransactionId) * oldsnap->xcnt +
4319  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
4320 
4321  change->data.snapshot = MemoryContextAllocZero(rb->context, size);
4322 
4323  newsnap = change->data.snapshot;
4324 
4325  memcpy(newsnap, data, size);
4326  newsnap->xip = (TransactionId *)
4327  (((char *) newsnap) + sizeof(SnapshotData));
4328  newsnap->subxip = newsnap->xip + newsnap->xcnt;
4329  newsnap->copied = true;
4330  break;
4331  }
4332  /* the base struct contains all the data, easy peasy */
4334  {
4335  Oid *relids;
4336 
4337  relids = ReorderBufferGetRelids(rb,
4338  change->data.truncate.nrelids);
4339  memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
4340  change->data.truncate.relids = relids;
4341 
4342  break;
4343  }
4348  break;
4349  }
4350 
4351  dlist_push_tail(&txn->changes, &change->node);
4352  txn->nentries_mem++;
4353 
4354  /*
4355  * Update memory accounting for the restored change. We need to do this
4356  * although we don't check the memory limit when restoring the changes in
4357  * this branch (we only do that when initially queueing the changes after
4358  * decoding), because we will release the changes later, and that will
4359  * update the accounting too (subtracting the size from the counters). And
4360  * we don't want to underflow there.
4361  */
4362  ReorderBufferChangeMemoryUpdate(rb, change, true,
4363  ReorderBufferChangeSize(change));
4364 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
HeapTupleData * HeapTuple
Definition: htup.h:71
uint32 TransactionId
Definition: c.h:587
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
struct SnapshotData * Snapshot
Definition: snapshot.h:121
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:85
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:38
dlist_head changes
struct SnapshotData SnapshotData
unsigned int uint32
Definition: c.h:441
struct ReorderBufferChange::@97::@102 inval
union ReorderBufferChange::@97 data
TransactionId * xip
Definition: snapshot.h:168
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition, Size sz)
MemoryContext context
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:906
struct ReorderBufferDiskChange ReorderBufferDiskChange
#define Assert(condition)
Definition: c.h:804
struct ReorderBufferChange::@97::@100 msg
size_t Size
Definition: c.h:540
struct ReorderBufferChange::@97::@98 tp
struct ReorderBufferChange::@97::@99 truncate
uint32 xcnt
Definition: snapshot.h:169
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863
Oid * ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
ReorderBufferChange change
struct HeapTupleData HeapTupleData
#define offsetof(type, field)
Definition: c.h:727
int32 subxcnt
Definition: snapshot.h:181
ReorderBufferTupleBuf * ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)

◆ ReorderBufferRestoreChanges()

static Size ReorderBufferRestoreChanges ( ReorderBuffer rb,
ReorderBufferTXN txn,
TXNEntryFile file,
XLogSegNo segno 
)
static

Definition at line 4061 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, cleanup(), dlist_mutable_iter::cur, TXNEntryFile::curOffset, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FileClose(), FileRead(), ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, max_changes_in_memory, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBuffer::outbuf, PathNameOpenFile(), PG_BINARY, ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferSerializedPath(), ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, TXNEntryFile::vfd, WAIT_EVENT_REORDER_BUFFER_READ, wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferIterTXNInit(), and ReorderBufferIterTXNNext().

4063 {
4064  Size restored = 0;
4065  XLogSegNo last_segno;
4066  dlist_mutable_iter cleanup_iter;
4067  File *fd = &file->vfd;
4068 
4071 
4072  /* free current entries, so we have memory for more */
4073  dlist_foreach_modify(cleanup_iter, &txn->changes)
4074  {
4076  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
4077 
4078  dlist_delete(&cleanup->node);
4079  ReorderBufferReturnChange(rb, cleanup, true);
4080  }
4081  txn->nentries_mem = 0;
4082  Assert(dlist_is_empty(&txn->changes));
4083 
4084  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
4085 
4086  while (restored < max_changes_in_memory && *segno <= last_segno)
4087  {
4088  int readBytes;
4089  ReorderBufferDiskChange *ondisk;
4090 
4091  if (*fd == -1)
4092  {
4093  char path[MAXPGPATH];
4094 
4095  /* first time in */
4096  if (*segno == 0)
4097  XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
4098 
4099  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
4100 
4101  /*
4102  * No need to care about TLIs here, only used during a single run,
4103  * so each LSN only maps to a specific WAL record.
4104  */
4106  *segno);
4107 
4108  *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
4109 
4110  /* No harm in resetting the offset even in case of failure */
4111  file->curOffset = 0;
4112 
4113  if (*fd < 0 && errno == ENOENT)
4114  {
4115  *fd = -1;
4116  (*segno)++;
4117  continue;
4118  }
4119  else if (*fd < 0)
4120  ereport(ERROR,
4122  errmsg("could not open file \"%s\": %m",
4123  path)));
4124  }
4125 
4126  /*
4127  * Read the statically sized part of a change which has information
4128  * about the total size. If we couldn't read a record, we're at the
4129  * end of this file.
4130  */
4132  readBytes = FileRead(file->vfd, rb->outbuf,
4133  sizeof(ReorderBufferDiskChange),
4135 
4136  /* eof */
4137  if (readBytes == 0)
4138  {
4139  FileClose(*fd);
4140  *fd = -1;
4141  (*segno)++;
4142  continue;
4143  }
4144  else if (readBytes < 0)
4145  ereport(ERROR,
4147  errmsg("could not read from reorderbuffer spill file: %m")));
4148  else if (readBytes != sizeof(ReorderBufferDiskChange))