PostgreSQL Source Code  git master
reorderbuffer.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/detoast.h"
#include "access/heapam.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/combocid.h"
#include "utils/memdebug.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenodemap.h"
Include dependency graph for reorderbuffer.c:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTXNByIdEnt
 
struct  ReorderBufferTupleCidKey
 
struct  ReorderBufferTupleCidEnt
 
struct  TXNEntryFile
 
struct  ReorderBufferIterTXNEntry
 
struct  ReorderBufferIterTXNState
 
struct  ReorderBufferToastEnt
 
struct  ReorderBufferDiskChange
 
struct  RewriteMappingFile
 

Macros

#define IsSpecInsert(action)
 
#define IsSpecConfirm(action)
 
#define IsInsertOrUpdate(action)
 

Typedefs

typedef struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
 
typedef struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
 
typedef struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
 
typedef struct TXNEntryFile TXNEntryFile
 
typedef struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
 
typedef struct ReorderBufferIterTXNState ReorderBufferIterTXNState
 
typedef struct ReorderBufferToastEnt ReorderBufferToastEnt
 
typedef struct ReorderBufferDiskChange ReorderBufferDiskChange
 
typedef struct RewriteMappingFile RewriteMappingFile
 

Functions

static ReorderBufferTXNReorderBufferGetTXN (ReorderBuffer *rb)
 
static void ReorderBufferReturnTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferTXNReorderBufferTXNByXid (ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
 
static void ReorderBufferTransferSnapToParent (ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
 
static void AssertTXNLsnOrder (ReorderBuffer *rb)
 
static void ReorderBufferIterTXNInit (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
 
static ReorderBufferChangeReorderBufferIterTXNNext (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferIterTXNFinish (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferExecuteInvalidations (uint32 nmsgs, SharedInvalidationMessage *msgs)
 
static void ReorderBufferCheckMemoryLimit (ReorderBuffer *rb)
 
static void ReorderBufferSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeChange (ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
 
static Size ReorderBufferRestoreChanges (ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
 
static void ReorderBufferRestoreChange (ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
 
static void ReorderBufferRestoreCleanup (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferTruncateTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
 
static void ReorderBufferCleanupSerializedTXNs (const char *slotname)
 
static void ReorderBufferSerializedPath (char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
 
static void ReorderBufferFreeSnap (ReorderBuffer *rb, Snapshot snap)
 
static Snapshot ReorderBufferCopySnap (ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
 
static bool ReorderBufferCanStream (ReorderBuffer *rb)
 
static bool ReorderBufferCanStartStreaming (ReorderBuffer *rb)
 
static void ReorderBufferStreamTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferStreamCommit (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastInitHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReset (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReplace (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void ReorderBufferToastAppendChunk (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static Size ReorderBufferChangeSize (ReorderBufferChange *change)
 
static void ReorderBufferChangeMemoryUpdate (ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
 
ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *rb)
 
void ReorderBufferReturnChange (ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
 
ReorderBufferTupleBufReorderBufferGetTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
 
OidReorderBufferGetRelids (ReorderBuffer *rb, int nrelids)
 
void ReorderBufferReturnRelids (ReorderBuffer *rb, Oid *relids)
 
static void ReorderBufferProcessPartialChange (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
static void AssertChangeLsnOrder (ReorderBufferTXN *txn)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
TransactionId ReorderBufferGetOldestXmin (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
static int ReorderBufferIterCompare (Datum a, Datum b, void *arg)
 
static void ReorderBufferCleanupTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferBuildTupleCidHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void SetupCheckXidLive (TransactionId xid)
 
static void ReorderBufferApplyChange (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyTruncate (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyMessage (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferSaveTXNSnapshot (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
 
static void ReorderBufferResetTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
 
static void ReorderBufferProcessTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)
 
static void ReorderBufferReplay (ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
bool ReorderBufferRememberPrepareInfo (ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferSkipPrepare (ReorderBuffer *rb, TransactionId xid)
 
void ReorderBufferPrepare (ReorderBuffer *rb, TransactionId xid, char *gid)
 
void ReorderBufferFinishPrepared (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr initial_consistent_point, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferInvalidate (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
static void ReorderBufferSerializeReserve (ReorderBuffer *rb, Size sz)
 
static ReorderBufferTXNReorderBufferLargestTXN (ReorderBuffer *rb)
 
static ReorderBufferTXNReorderBufferLargestTopTXN (ReorderBuffer *rb)
 
void StartupReorderBuffer (void)
 
static void ApplyLogicalMappingFile (HTAB *tuplecid_data, Oid relid, const char *fname)
 
static bool TransactionIdInArray (TransactionId xid, TransactionId *xip, Size num)
 
static int file_sort_by_lsn (const ListCell *a_p, const ListCell *b_p)
 
static void UpdateLogicalMappings (HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
 
bool ResolveCminCmaxDuringDecoding (HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
 

Variables

int logical_decoding_work_mem
 
static const Size max_changes_in_memory = 4096
 

Macro Definition Documentation

◆ IsInsertOrUpdate

◆ IsSpecConfirm

◆ IsSpecInsert

Typedef Documentation

◆ ReorderBufferDiskChange

◆ ReorderBufferIterTXNEntry

◆ ReorderBufferIterTXNState

◆ ReorderBufferToastEnt

◆ ReorderBufferTupleCidEnt

◆ ReorderBufferTupleCidKey

◆ ReorderBufferTXNByIdEnt

◆ RewriteMappingFile

◆ TXNEntryFile

typedef struct TXNEntryFile TXNEntryFile

Function Documentation

◆ ApplyLogicalMappingFile()

static void ApplyLogicalMappingFile ( HTAB tuplecid_data,
Oid  relid,
const char *  fname 
)
static

Definition at line 4798 of file reorderbuffer.c.

References Assert, CloseTransientFile(), ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), HASH_ENTER, HASH_FIND, hash_search(), InvalidCommandId, ItemPointerCopy, sort-test::key, MAXPGPATH, LogicalRewriteMappingData::new_node, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_node, LogicalRewriteMappingData::old_tid, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, ReorderBufferTupleCidKey::relnode, sprintf, ReorderBufferTupleCidKey::tid, and WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ.

Referenced by UpdateLogicalMappings().

4799 {
4800  char path[MAXPGPATH];
4801  int fd;
4802  int readBytes;
4804 
4805  sprintf(path, "pg_logical/mappings/%s", fname);
4806  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
4807  if (fd < 0)
4808  ereport(ERROR,
4810  errmsg("could not open file \"%s\": %m", path)));
4811 
4812  while (true)
4813  {
4816  ReorderBufferTupleCidEnt *new_ent;
4817  bool found;
4818 
4819  /* be careful about padding */
4820  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
4821 
4822  /* read all mappings till the end of the file */
4824  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
4826 
4827  if (readBytes < 0)
4828  ereport(ERROR,
4830  errmsg("could not read file \"%s\": %m",
4831  path)));
4832  else if (readBytes == 0) /* EOF */
4833  break;
4834  else if (readBytes != sizeof(LogicalRewriteMappingData))
4835  ereport(ERROR,
4837  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
4838  path, readBytes,
4839  (int32) sizeof(LogicalRewriteMappingData))));
4840 
4841  key.relnode = map.old_node;
4842  ItemPointerCopy(&map.old_tid,
4843  &key.tid);
4844 
4845 
4846  ent = (ReorderBufferTupleCidEnt *)
4847  hash_search(tuplecid_data,
4848  (void *) &key,
4849  HASH_FIND,
4850  NULL);
4851 
4852  /* no existing mapping, no need to update */
4853  if (!ent)
4854  continue;
4855 
4856  key.relnode = map.new_node;
4857  ItemPointerCopy(&map.new_tid,
4858  &key.tid);
4859 
4860  new_ent = (ReorderBufferTupleCidEnt *)
4861  hash_search(tuplecid_data,
4862  (void *) &key,
4863  HASH_ENTER,
4864  &found);
4865 
4866  if (found)
4867  {
4868  /*
4869  * Make sure the existing mapping makes sense. We sometime update
4870  * old records that did not yet have a cmax (e.g. pg_class' own
4871  * entry while rewriting it) during rewrites, so allow that.
4872  */
4873  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
4874  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
4875  }
4876  else
4877  {
4878  /* update mapping */
4879  new_ent->cmin = ent->cmin;
4880  new_ent->cmax = ent->cmax;
4881  new_ent->combocid = ent->combocid;
4882  }
4883  }
4884 
4885  if (CloseTransientFile(fd) != 0)
4886  ereport(ERROR,
4888  errmsg("could not close file \"%s\": %m", path)));
4889 }
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1271
signed int int32
Definition: c.h:429
#define sprintf
Definition: port.h:218
#define ERROR
Definition: elog.h:45
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2404
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:717
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1512
int CloseTransientFile(int fd)
Definition: fd.c:2581
#define InvalidCommandId
Definition: c.h:604
#define ereport(elevel,...)
Definition: elog.h:155
ItemPointerData new_tid
Definition: rewriteheap.h:40
#define Assert(condition)
Definition: c.h:804
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1488
int errmsg(const char *fmt,...)
Definition: elog.c:905
#define read(a, b, c)
Definition: win32.h:13
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:161
ItemPointerData old_tid
Definition: rewriteheap.h:39

◆ AssertChangeLsnOrder()

static void AssertChangeLsnOrder ( ReorderBufferTXN txn)
static

Definition at line 913 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, and ReorderBufferChange::lsn.

Referenced by ReorderBufferIterTXNInit().

914 {
915 #ifdef USE_ASSERT_CHECKING
916  dlist_iter iter;
917  XLogRecPtr prev_lsn = txn->first_lsn;
918 
919  dlist_foreach(iter, &txn->changes)
920  {
921  ReorderBufferChange *cur_change;
922 
923  cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
924 
926  Assert(cur_change->lsn != InvalidXLogRecPtr);
927  Assert(txn->first_lsn <= cur_change->lsn);
928 
929  if (txn->end_lsn != InvalidXLogRecPtr)
930  Assert(cur_change->lsn <= txn->end_lsn);
931 
932  Assert(prev_lsn <= cur_change->lsn);
933 
934  prev_lsn = cur_change->lsn;
935  }
936 #endif
937 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
dlist_node * cur
Definition: ilist.h:161
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn

◆ AssertTXNLsnOrder()

static void AssertTXNLsnOrder ( ReorderBuffer rb)
static

Definition at line 856 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferAssignChild(), ReorderBufferGetOldestTXN(), ReorderBufferGetOldestXmin(), ReorderBufferSetBaseSnapshot(), and ReorderBufferTXNByXid().

857 {
858 #ifdef USE_ASSERT_CHECKING
859  dlist_iter iter;
860  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
861  XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
862 
863  dlist_foreach(iter, &rb->toplevel_by_lsn)
864  {
866  iter.cur);
867 
868  /* start LSN must be set */
869  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
870 
871  /* If there is an end LSN, it must be higher than start LSN */
872  if (cur_txn->end_lsn != InvalidXLogRecPtr)
873  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
874 
875  /* Current initial LSN must be strictly higher than previous */
876  if (prev_first_lsn != InvalidXLogRecPtr)
877  Assert(prev_first_lsn < cur_txn->first_lsn);
878 
879  /* known-as-subtxn txns must not be listed */
880  Assert(!rbtxn_is_known_subxact(cur_txn));
881 
882  prev_first_lsn = cur_txn->first_lsn;
883  }
884 
886  {
888  base_snapshot_node,
889  iter.cur);
890 
891  /* base snapshot (and its LSN) must be set */
892  Assert(cur_txn->base_snapshot != NULL);
894 
895  /* current LSN must be strictly higher than previous */
896  if (prev_base_snap_lsn != InvalidXLogRecPtr)
897  Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
898 
899  /* known-as-subtxn txns must not be listed */
900  Assert(!rbtxn_is_known_subxact(cur_txn));
901 
902  prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
903  }
904 #endif
905 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
XLogRecPtr base_snapshot_lsn
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head txns_by_base_snapshot_lsn
dlist_head toplevel_by_lsn
dlist_node * cur
Definition: ilist.h:161
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn
#define rbtxn_is_known_subxact(txn)

◆ file_sort_by_lsn()

static int file_sort_by_lsn ( const ListCell a_p,
const ListCell b_p 
)
static

Definition at line 4906 of file reorderbuffer.c.

References lfirst, and RewriteMappingFile::lsn.

Referenced by UpdateLogicalMappings().

4907 {
4910 
4911  if (a->lsn < b->lsn)
4912  return -1;
4913  else if (a->lsn > b->lsn)
4914  return 1;
4915  return 0;
4916 }
#define lfirst(lc)
Definition: pg_list.h:169

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2757 of file reorderbuffer.c.

References ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), ReorderBuffer::stream_abort, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeAbort().

2758 {
2759  ReorderBufferTXN *txn;
2760 
2761  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2762  false);
2763 
2764  /* unknown, nothing to remove */
2765  if (txn == NULL)
2766  return;
2767 
2768  /* For streamed transactions notify the remote node about the abort. */
2769  if (rbtxn_is_streamed(txn))
2770  {
2771  rb->stream_abort(rb, txn, lsn);
2772 
2773  /*
2774  * We might have decoded changes for this transaction that could load
2775  * the cache as per the current transaction's view (consider DDL's
2776  * happened in this transaction). We don't want the decoding of future
2777  * transactions to use those cache entries so execute invalidations.
2778  */
2779  if (txn->ninvalidations > 0)
2781  txn->invalidations);
2782  }
2783 
2784  /* cosmetic... */
2785  txn->final_lsn = lsn;
2786 
2787  /* remove potential on-disk data, and deallocate */
2788  ReorderBufferCleanupTXN(rb, txn);
2789 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
ReorderBufferStreamAbortCB stream_abort
#define rbtxn_is_streamed(txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)

Definition at line 2799 of file reorderbuffer.c.

References dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, elog, ReorderBufferCleanupTXN(), ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::xid.

Referenced by DecodeStandbyOp().

2800 {
2801  dlist_mutable_iter it;
2802 
2803  /*
2804  * Iterate through all (potential) toplevel TXNs and abort all that are
2805  * older than what possibly can be running. Once we've found the first
2806  * that is alive we stop, there might be some that acquired an xid earlier
2807  * but started writing later, but it's unlikely and they will be cleaned
2808  * up in a later call to this function.
2809  */
2811  {
2812  ReorderBufferTXN *txn;
2813 
2814  txn = dlist_container(ReorderBufferTXN, node, it.cur);
2815 
2816  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2817  {
2818  elog(DEBUG2, "aborting old transaction %u", txn->xid);
2819 
2820  /* remove potential on-disk data, and deallocate this tx */
2821  ReorderBufferCleanupTXN(rb, txn);
2822  }
2823  else
2824  return;
2825  }
2826 }
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define DEBUG2
Definition: elog.h:24
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
dlist_head toplevel_by_lsn
TransactionId xid
#define elog(elevel,...)
Definition: elog.h:227

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 3136 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferTXN::invalidations, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, palloc(), REORDER_BUFFER_CHANGE_INVALIDATION, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), repalloc(), ReorderBufferTXN::toptxn, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeXactOp().

3139 {
3140  ReorderBufferTXN *txn;
3141  MemoryContext oldcontext;
3142  ReorderBufferChange *change;
3143 
3144  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3145 
3146  oldcontext = MemoryContextSwitchTo(rb->context);
3147 
3148  /*
3149  * Collect all the invalidations under the top transaction so that we can
3150  * execute them all together. See comment atop this function
3151  */
3152  if (txn->toptxn)
3153  txn = txn->toptxn;
3154 
3155  Assert(nmsgs > 0);
3156 
3157  /* Accumulate invalidations. */
3158  if (txn->ninvalidations == 0)
3159  {
3160  txn->ninvalidations = nmsgs;
3162  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3163  memcpy(txn->invalidations, msgs,
3164  sizeof(SharedInvalidationMessage) * nmsgs);
3165  }
3166  else
3167  {
3170  (txn->ninvalidations + nmsgs));
3171 
3172  memcpy(txn->invalidations + txn->ninvalidations, msgs,
3173  nmsgs * sizeof(SharedInvalidationMessage));
3174  txn->ninvalidations += nmsgs;
3175  }
3176 
3177  change = ReorderBufferGetChange(rb);
3179  change->data.inval.ninvalidations = nmsgs;
3180  change->data.inval.invalidations = (SharedInvalidationMessage *)
3181  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3182  memcpy(change->data.inval.invalidations, msgs,
3183  sizeof(SharedInvalidationMessage) * nmsgs);
3184 
3185  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3186 
3187  MemoryContextSwitchTo(oldcontext);
3188 }
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
struct ReorderBufferTXN * toptxn
MemoryContext context
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
#define Assert(condition)
Definition: c.h:804
SharedInvalidationMessage * invalidations
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1070
struct ReorderBufferChange::@96::@101 inval
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:950
union ReorderBufferChange::@96 data

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

Definition at line 3013 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::command_id, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, ReorderBufferGetChange(), and ReorderBufferQueueChange().

Referenced by SnapBuildProcessNewCid().

3015 {
3017 
3018  change->data.command_id = cid;
3020 
3021  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3022 }
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
union ReorderBufferChange::@96 data

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileNode  node,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 3100 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessNewCid().

3104 {
3106  ReorderBufferTXN *txn;
3107 
3108  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3109 
3110  change->data.tuplecid.node = node;
3111  change->data.tuplecid.tid = tid;
3112  change->data.tuplecid.cmin = cmin;
3113  change->data.tuplecid.cmax = cmax;
3114  change->data.tuplecid.combocid = combocid;
3115  change->lsn = lsn;
3116  change->txn = txn;
3118 
3119  dlist_push_tail(&txn->tuplecids, &change->node);
3120  txn->ntuplecids++;
3121 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:87
struct ReorderBufferChange::@96::@100 tuplecid
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
union ReorderBufferChange::@96 data
dlist_head tuplecids

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 2964 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, ReorderBufferGetChange(), ReorderBufferQueueChange(), and ReorderBufferChange::snapshot.

Referenced by SnapBuildDistributeNewCatalogSnapshot().

2966 {
2968 
2969  change->data.snapshot = snap;
2971 
2972  ReorderBufferQueueChange(rb, xid, lsn, change, false);
2973 }
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
union ReorderBufferChange::@96 data

◆ ReorderBufferAllocate()

ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 299 of file reorderbuffer.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, ReorderBufferCleanupSerializedTXNs(), ReorderBuffer::size, SLAB_DEFAULT_BLOCK_SIZE, SLAB_LARGE_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBuffer::toplevel_by_lsn, ReorderBuffer::tup_context, ReorderBuffer::txn_context, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

300 {
301  ReorderBuffer *buffer;
302  HASHCTL hash_ctl;
303  MemoryContext new_ctx;
304 
305  Assert(MyReplicationSlot != NULL);
306 
307  /* allocate memory in own context, to have better accountability */
309  "ReorderBuffer",
311 
312  buffer =
313  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
314 
315  memset(&hash_ctl, 0, sizeof(hash_ctl));
316 
317  buffer->context = new_ctx;
318 
319  buffer->change_context = SlabContextCreate(new_ctx,
320  "Change",
322  sizeof(ReorderBufferChange));
323 
324  buffer->txn_context = SlabContextCreate(new_ctx,
325  "TXN",
327  sizeof(ReorderBufferTXN));
328 
329  buffer->tup_context = GenerationContextCreate(new_ctx,
330  "Tuples",
332 
333  hash_ctl.keysize = sizeof(TransactionId);
334  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
335  hash_ctl.hcxt = buffer->context;
336 
337  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
339 
341  buffer->by_txn_last_txn = NULL;
342 
343  buffer->outbuf = NULL;
344  buffer->outbufsize = 0;
345  buffer->size = 0;
346 
347  buffer->spillTxns = 0;
348  buffer->spillCount = 0;
349  buffer->spillBytes = 0;
350  buffer->streamTxns = 0;
351  buffer->streamCount = 0;
352  buffer->streamBytes = 0;
353 
355 
356  dlist_init(&buffer->toplevel_by_lsn);
358 
359  /*
360  * Ensure there's no stale data from prior uses of this slot, in case some
361  * prior exit avoided calling ReorderBufferFree. Failure to do this can
362  * produce duplicated txns, and it's very cheap if there's nothing there.
363  */
365 
366  return buffer;
367 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define AllocSetContextCreate
Definition: memutils.h:170
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
uint32 TransactionId
Definition: c.h:587
MemoryContext hcxt
Definition: hsearch.h:86
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
Size entrysize
Definition: hsearch.h:76
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:174
ReplicationSlotPersistentData data
Definition: slot.h:156
MemoryContext change_context
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:222
dlist_head txns_by_base_snapshot_lsn
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
#define InvalidTransactionId
Definition: transam.h:31
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define HASH_BLOBS
Definition: hsearch.h:97
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size blockSize)
Definition: generation.c:196
MemoryContext context
ReorderBufferTXN * by_txn_last_txn
dlist_head toplevel_by_lsn
Size keysize
Definition: hsearch.h:75
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:804
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:221
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797
#define NameStr(name)
Definition: c.h:681
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
MemoryContext tup_context
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext txn_context

◆ ReorderBufferApplyChange()

static void ReorderBufferApplyChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1889 of file reorderbuffer.c.

References ReorderBuffer::apply_change, and ReorderBuffer::stream_change.

Referenced by ReorderBufferProcessTXN().

1892 {
1893  if (streaming)
1894  rb->stream_change(rb, txn, relation, change);
1895  else
1896  rb->apply_change(rb, txn, relation, change);
1897 }
ReorderBufferApplyChangeCB apply_change
ReorderBufferStreamChangeCB stream_change

◆ ReorderBufferApplyMessage()

static void ReorderBufferApplyMessage ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1917 of file reorderbuffer.c.

References ReorderBufferChange::data, ReorderBufferChange::lsn, ReorderBuffer::message, ReorderBufferChange::msg, and ReorderBuffer::stream_message.

Referenced by ReorderBufferProcessTXN().

1919 {
1920  if (streaming)
1921  rb->stream_message(rb, txn, change->lsn, true,
1922  change->data.msg.prefix,
1923  change->data.msg.message_size,
1924  change->data.msg.message);
1925  else
1926  rb->message(rb, txn, change->lsn, true,
1927  change->data.msg.prefix,
1928  change->data.msg.message_size,
1929  change->data.msg.message);
1930 }
ReorderBufferStreamMessageCB stream_message
ReorderBufferMessageCB message
struct ReorderBufferChange::@96::@99 msg
union ReorderBufferChange::@96 data

◆ ReorderBufferApplyTruncate()

static void ReorderBufferApplyTruncate ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  nrelations,
Relation relations,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1903 of file reorderbuffer.c.

References ReorderBuffer::apply_truncate, and ReorderBuffer::stream_truncate.

Referenced by ReorderBufferProcessTXN().

1906 {
1907  if (streaming)
1908  rb->stream_truncate(rb, txn, nrelations, relations, change);
1909  else
1910  rb->apply_truncate(rb, txn, nrelations, relations, change);
1911 }
ReorderBufferStreamTruncateCB stream_truncate
ReorderBufferApplyTruncateCB apply_truncate

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 999 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SUBXACT, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, ReorderBufferTXN::toplevel_xid, ReorderBufferTXN::toptxn, ReorderBufferTXNByIdEnt::txn, ReorderBufferTXN::txn_flags, and ReorderBufferTXNByIdEnt::xid.

Referenced by LogicalDecodingProcessRecord(), and ReorderBufferCommitChild().

1001 {
1002  ReorderBufferTXN *txn;
1003  ReorderBufferTXN *subtxn;
1004  bool new_top;
1005  bool new_sub;
1006 
1007  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1008  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1009 
1010  if (!new_sub)
1011  {
1012  if (rbtxn_is_known_subxact(subtxn))
1013  {
1014  /* already associated, nothing to do */
1015  return;
1016  }
1017  else
1018  {
1019  /*
1020  * We already saw this transaction, but initially added it to the
1021  * list of top-level txns. Now that we know it's not top-level,
1022  * remove it from there.
1023  */
1024  dlist_delete(&subtxn->node);
1025  }
1026  }
1027 
1028  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1029  subtxn->toplevel_xid = xid;
1030  Assert(subtxn->nsubtxns == 0);
1031 
1032  /* set the reference to top-level transaction */
1033  subtxn->toptxn = txn;
1034 
1035  /* add to subtransaction list */
1036  dlist_push_tail(&txn->subtxns, &subtxn->node);
1037  txn->nsubtxns++;
1038 
1039  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1040  ReorderBufferTransferSnapToParent(txn, subtxn);
1041 
1042  /* Verify LSN-ordering invariant */
1043  AssertTXNLsnOrder(rb);
1044 }
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
#define RBTXN_IS_SUBXACT
struct ReorderBufferTXN * toptxn
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:804
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_known_subxact(txn)
TransactionId toplevel_xid

◆ ReorderBufferBuildTupleCidHash()

static void ReorderBufferBuildTupleCidHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1652 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, dlist_container, dlist_foreach, dlist_is_empty(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FIND, hash_search(), HASHCTL::hcxt, InvalidCommandId, ItemPointerCopy, sort-test::key, HASHCTL::keysize, ReorderBufferTXN::ntuplecids, rbtxn_has_catalog_changes, ReorderBufferTupleCidKey::relnode, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferTupleCidKey::tid, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::tuplecids.

Referenced by ReorderBufferProcessTXN().

1653 {
1654  dlist_iter iter;
1655  HASHCTL hash_ctl;
1656 
1658  return;
1659 
1660  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1661  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1662  hash_ctl.hcxt = rb->context;
1663 
1664  /*
1665  * create the hash with the exact number of to-be-stored tuplecids from
1666  * the start
1667  */
1668  txn->tuplecid_hash =
1669  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1671 
1672  dlist_foreach(iter, &txn->tuplecids)
1673  {
1676  bool found;
1677  ReorderBufferChange *change;
1678 
1679  change = dlist_container(ReorderBufferChange, node, iter.cur);
1680 
1682 
1683  /* be careful about padding */
1684  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1685 
1686  key.relnode = change->data.tuplecid.node;
1687 
1688  ItemPointerCopy(&change->data.tuplecid.tid,
1689  &key.tid);
1690 
1691  ent = (ReorderBufferTupleCidEnt *)
1693  (void *) &key,
1695  &found);
1696  if (!found)
1697  {
1698  ent->cmin = change->data.tuplecid.cmin;
1699  ent->cmax = change->data.tuplecid.cmax;
1700  ent->combocid = change->data.tuplecid.combocid;
1701  }
1702  else
1703  {
1704  /*
1705  * Maybe we already saw this tuple before in this transaction, but
1706  * if so it must have the same cmin.
1707  */
1708  Assert(ent->cmin == change->data.tuplecid.cmin);
1709 
1710  /*
1711  * cmax may be initially invalid, but once set it can only grow,
1712  * and never become invalid again.
1713  */
1714  Assert((ent->cmax == InvalidCommandId) ||
1715  ((change->data.tuplecid.cmax != InvalidCommandId) &&
1716  (change->data.tuplecid.cmax > ent->cmax)));
1717  ent->cmax = change->data.tuplecid.cmax;
1718  }
1719  }
1720 }
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
MemoryContext hcxt
Definition: hsearch.h:86
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
struct ReorderBufferChange::@96::@100 tuplecid
Size entrysize
Definition: hsearch.h:76
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
#define HASH_BLOBS
Definition: hsearch.h:97
MemoryContext context
#define InvalidCommandId
Definition: c.h:604
Size keysize
Definition: hsearch.h:75
dlist_node * cur
Definition: ilist.h:161
#define Assert(condition)
Definition: c.h:804
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define rbtxn_has_catalog_changes(txn)
union ReorderBufferChange::@96 data
dlist_head tuplecids
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:161

◆ ReorderBufferCanStartStreaming()

static bool ReorderBufferCanStartStreaming ( ReorderBuffer rb)
inlinestatic

Definition at line 3760 of file reorderbuffer.c.

References XLogReaderState::EndRecPtr, ReorderBuffer::private_data, LogicalDecodingContext::reader, ReorderBufferCanStream(), SNAPBUILD_CONSISTENT, SnapBuildCurrentState(), SnapBuildXactNeedsSkip(), and LogicalDecodingContext::snapshot_builder.

Referenced by ReorderBufferCheckMemoryLimit(), and ReorderBufferProcessPartialChange().

3761 {
3763  SnapBuild *builder = ctx->snapshot_builder;
3764 
3765  /* We can't start streaming unless a consistent state is reached. */
3767  return false;
3768 
3769  /*
3770  * We can't start streaming immediately even if the streaming is enabled
3771  * because we previously decoded this transaction and now just are
3772  * restarting.
3773  */
3774  if (ReorderBufferCanStream(rb) &&
3775  !SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
3776  return true;
3777 
3778  return false;
3779 }
void * private_data
bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
Definition: snapbuild.c:385
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
Definition: snapbuild.c:367
XLogRecPtr EndRecPtr
Definition: xlogreader.h:176
static bool ReorderBufferCanStream(ReorderBuffer *rb)
struct SnapBuild * snapshot_builder
Definition: logical.h:43
XLogReaderState * reader
Definition: logical.h:41

◆ ReorderBufferCanStream()

static bool ReorderBufferCanStream ( ReorderBuffer rb)
inlinestatic

◆ ReorderBufferChangeMemoryUpdate()

static void ReorderBufferChangeMemoryUpdate ( ReorderBuffer rb,
ReorderBufferChange change,
bool  addition 
)
static

Definition at line 3038 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferCanStream(), ReorderBufferChangeSize(), ReorderBufferTXN::size, ReorderBuffer::size, ReorderBufferTXN::toptxn, ReorderBufferTXN::total_size, ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferReturnChange(), and ReorderBufferToastReplace().

3041 {
3042  Size sz;
3043  ReorderBufferTXN *txn;
3044  ReorderBufferTXN *toptxn = NULL;
3045 
3046  Assert(change->txn);
3047 
3048  /*
3049  * Ignore tuple CID changes, because those are not evicted when reaching
3050  * memory limit. So we just don't count them, because it might easily
3051  * trigger a pointless attempt to spill.
3052  */
3054  return;
3055 
3056  txn = change->txn;
3057 
3058  /* If streaming supported, update the total size in top level as well. */
3059  if (ReorderBufferCanStream(rb))
3060  {
3061  if (txn->toptxn != NULL)
3062  toptxn = txn->toptxn;
3063  else
3064  toptxn = txn;
3065  }
3066 
3067  sz = ReorderBufferChangeSize(change);
3068 
3069  if (addition)
3070  {
3071  txn->size += sz;
3072  rb->size += sz;
3073 
3074  /* Update the total size in the top transaction. */
3075  if (toptxn)
3076  toptxn->total_size += sz;
3077  }
3078  else
3079  {
3080  Assert((rb->size >= sz) && (txn->size >= sz));
3081  txn->size -= sz;
3082  rb->size -= sz;
3083 
3084  /* Update the total size in the top transaction. */
3085  if (toptxn)
3086  toptxn->total_size -= sz;
3087  }
3088 
3089  Assert(txn->size <= rb->size);
3090 }
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:87
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
static bool ReorderBufferCanStream(ReorderBuffer *rb)
struct ReorderBufferTXN * toptxn
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
#define Assert(condition)
Definition: c.h:804
size_t Size
Definition: c.h:540

◆ ReorderBufferChangeSize()

static Size ReorderBufferChangeSize ( ReorderBufferChange change)
static

Definition at line 3900 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::msg, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::snapshot, SnapshotData::subxcnt, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTupleBuf::tuple, and SnapshotData::xcnt.

Referenced by ReorderBufferChangeMemoryUpdate().

3901 {
3902  Size sz = sizeof(ReorderBufferChange);
3903 
3904  switch (change->action)
3905  {
3906  /* fall through these, they're all similar enough */
3911  {
3912  ReorderBufferTupleBuf *oldtup,
3913  *newtup;
3914  Size oldlen = 0;
3915  Size newlen = 0;
3916 
3917  oldtup = change->data.tp.oldtuple;
3918  newtup = change->data.tp.newtuple;
3919 
3920  if (oldtup)
3921  {
3922  sz += sizeof(HeapTupleData);
3923  oldlen = oldtup->tuple.t_len;
3924  sz += oldlen;
3925  }
3926 
3927  if (newtup)
3928  {
3929  sz += sizeof(HeapTupleData);
3930  newlen = newtup->tuple.t_len;
3931  sz += newlen;
3932  }
3933 
3934  break;
3935  }
3937  {
3938  Size prefix_size = strlen(change->data.msg.prefix) + 1;
3939 
3940  sz += prefix_size + change->data.msg.message_size +
3941  sizeof(Size) + sizeof(Size);
3942 
3943  break;
3944  }
3946  {
3947  sz += sizeof(SharedInvalidationMessage) *
3948  change->data.inval.ninvalidations;
3949  break;
3950  }
3952  {
3953  Snapshot snap;
3954 
3955  snap = change->data.snapshot;
3956 
3957  sz += sizeof(SnapshotData) +
3958  sizeof(TransactionId) * snap->xcnt +
3959  sizeof(TransactionId) * snap->subxcnt;
3960 
3961  break;
3962  }
3964  {
3965  sz += sizeof(Oid) * change->data.truncate.nrelids;
3966 
3967  break;
3968  }
3972  /* ReorderBufferChange contains everything important */
3973  break;
3974  }
3975 
3976  return sz;
3977 }
uint32 TransactionId
Definition: c.h:587
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
uint32 t_len
Definition: htup.h:64
HeapTupleData tuple
Definition: reorderbuffer.h:29
struct SnapshotData SnapshotData
struct ReorderBufferChange::@96::@99 msg
struct ReorderBufferChange::@96::@97 tp
struct ReorderBufferChange::@96::@98 truncate
size_t Size
Definition: c.h:540
struct ReorderBufferChange::@96::@101 inval
uint32 xcnt
Definition: snapshot.h:169
union ReorderBufferChange::@96 data
struct HeapTupleData HeapTupleData
struct ReorderBufferChange ReorderBufferChange
int32 subxcnt
Definition: snapshot.h:181

◆ ReorderBufferCheckMemoryLimit()

static void ReorderBufferCheckMemoryLimit ( ReorderBuffer rb)
static

Definition at line 3383 of file reorderbuffer.c.

References Assert, logical_decoding_work_mem, ReorderBufferTXN::nentries_mem, ReorderBufferCanStartStreaming(), ReorderBufferLargestTopTXN(), ReorderBufferLargestTXN(), ReorderBufferSerializeTXN(), ReorderBufferStreamTXN(), ReorderBufferTXN::size, ReorderBuffer::size, ReorderBufferTXN::toptxn, ReorderBufferTXN::total_size, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferQueueChange().

3384 {
3385  ReorderBufferTXN *txn;
3386 
3387  /* bail out if we haven't exceeded the memory limit */
3388  if (rb->size < logical_decoding_work_mem * 1024L)
3389  return;
3390 
3391  /*
3392  * Loop until we reach under the memory limit. One might think that just
3393  * by evicting the largest (sub)transaction we will come under the memory
3394  * limit based on assumption that the selected transaction is at least as
3395  * large as the most recent change (which caused us to go over the memory
3396  * limit). However, that is not true because a user can reduce the
3397  * logical_decoding_work_mem to a smaller value before the most recent
3398  * change.
3399  */
3400  while (rb->size >= logical_decoding_work_mem * 1024L)
3401  {
3402  /*
3403  * Pick the largest transaction (or subtransaction) and evict it from
3404  * memory by streaming, if possible. Otherwise, spill to disk.
3405  */
3407  (txn = ReorderBufferLargestTopTXN(rb)) != NULL)
3408  {
3409  /* we know there has to be one, because the size is not zero */
3410  Assert(txn && !txn->toptxn);
3411  Assert(txn->total_size > 0);
3412  Assert(rb->size >= txn->total_size);
3413 
3414  ReorderBufferStreamTXN(rb, txn);
3415  }
3416  else
3417  {
3418  /*
3419  * Pick the largest transaction (or subtransaction) and evict it
3420  * from memory by serializing it to disk.
3421  */
3422  txn = ReorderBufferLargestTXN(rb);
3423 
3424  /* we know there has to be one, because the size is not zero */
3425  Assert(txn);
3426  Assert(txn->size > 0);
3427  Assert(rb->size >= txn->size);
3428 
3429  ReorderBufferSerializeTXN(rb, txn);
3430  }
3431 
3432  /*
3433  * After eviction, the transaction should have no entries in memory,
3434  * and should use 0 bytes for changes.
3435  */
3436  Assert(txn->size == 0);
3437  Assert(txn->nentries_mem == 0);
3438  }
3439 
3440  /* We must be under the memory limit now. */
3441  Assert(rb->size < logical_decoding_work_mem * 1024L);
3442 }
static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferLargestTopTXN(ReorderBuffer *rb)
struct ReorderBufferTXN * toptxn
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:804
int logical_decoding_work_mem
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)

◆ ReorderBufferCleanupSerializedTXNs()

static void ReorderBufferCleanupSerializedTXNs ( const char *  slotname)
static

Definition at line 4321 of file reorderbuffer.c.

References AllocateDir(), dirent::d_name, ereport, errcode_for_file_access(), errmsg(), ERROR, FreeDir(), INFO, lstat, MAXPGPATH, ReadDirExtended(), S_ISDIR, snprintf, sprintf, and stat::st_mode.

Referenced by ReorderBufferAllocate(), ReorderBufferFree(), and StartupReorderBuffer().

4322 {
4323  DIR *spill_dir;
4324  struct dirent *spill_de;
4325  struct stat statbuf;
4326  char path[MAXPGPATH * 2 + 12];
4327 
4328  sprintf(path, "pg_replslot/%s", slotname);
4329 
4330  /* we're only handling directories here, skip if it's not ours */
4331  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
4332  return;
4333 
4334  spill_dir = AllocateDir(path);
4335  while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
4336  {
4337  /* only look at names that can be ours */
4338  if (strncmp(spill_de->d_name, "xid", 3) == 0)
4339  {
4340  snprintf(path, sizeof(path),
4341  "pg_replslot/%s/%s", slotname,
4342  spill_de->d_name);
4343 
4344  if (unlink(path) != 0)
4345  ereport(ERROR,
4347  errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
4348  path, slotname)));
4349  }
4350  }
4351  FreeDir(spill_dir);
4352 }
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2696
#define INFO
Definition: elog.h:33
Definition: dirent.h:9
#define sprintf
Definition: port.h:218
Definition: dirent.c:25
#define ERROR
Definition: elog.h:45
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:717
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2615
#define ereport(elevel,...)
Definition: elog.h:155
#define S_ISDIR(m)
Definition: win32_port.h:316
#define lstat(path, sb)
Definition: win32_port.h:276
int errmsg(const char *fmt,...)
Definition: elog.c:905
char d_name[MAX_PATH]
Definition: dirent.h:15
#define snprintf
Definition: port.h:216
int FreeDir(DIR *dir)
Definition: fd.c:2733

◆ ReorderBufferCleanupTXN()

static void ReorderBufferCleanupTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1429 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_node, ReorderBuffer::by_txn, ReorderBufferTXN::changes, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, HASH_REMOVE, hash_search(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, rbtxn_is_serialized, rbtxn_is_streamed, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferFreeSnap(), ReorderBufferRestoreCleanup(), ReorderBufferReturnChange(), ReorderBufferReturnTXN(), SnapBuildSnapDecRefcount(), ReorderBufferTXN::snapshot_now, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAbortOld(), ReorderBufferFinishPrepared(), ReorderBufferForget(), ReorderBufferProcessTXN(), ReorderBufferReplay(), and ReorderBufferStreamCommit().

1430 {
1431  bool found;
1432  dlist_mutable_iter iter;
1433 
1434  /* cleanup subtransactions & their changes */
1435  dlist_foreach_modify(iter, &txn->subtxns)
1436  {
1437  ReorderBufferTXN *subtxn;
1438 
1439  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1440 
1441  /*
1442  * Subtransactions are always associated to the toplevel TXN, even if
1443  * they originally were happening inside another subtxn, so we won't
1444  * ever recurse more than one level deep here.
1445  */
1446  Assert(rbtxn_is_known_subxact(subtxn));
1447  Assert(subtxn->nsubtxns == 0);
1448 
1449  ReorderBufferCleanupTXN(rb, subtxn);
1450  }
1451 
1452  /* cleanup changes in the txn */
1453  dlist_foreach_modify(iter, &txn->changes)
1454  {
1455  ReorderBufferChange *change;
1456 
1457  change = dlist_container(ReorderBufferChange, node, iter.cur);
1458 
1459  /* Check we're not mixing changes from different transactions. */
1460  Assert(change->txn == txn);
1461 
1462  ReorderBufferReturnChange(rb, change, true);
1463  }
1464 
1465  /*
1466  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1467  * They are always stored in the toplevel transaction.
1468  */
1469  dlist_foreach_modify(iter, &txn->tuplecids)
1470  {
1471  ReorderBufferChange *change;
1472 
1473  change = dlist_container(ReorderBufferChange, node, iter.cur);
1474 
1475  /* Check we're not mixing changes from different transactions. */
1476  Assert(change->txn == txn);
1478 
1479  ReorderBufferReturnChange(rb, change, true);
1480  }
1481 
1482  /*
1483  * Cleanup the base snapshot, if set.
1484  */
1485  if (txn->base_snapshot != NULL)
1486  {
1489  }
1490 
1491  /*
1492  * Cleanup the snapshot for the last streamed run.
1493  */
1494  if (txn->snapshot_now != NULL)
1495  {
1496  Assert(rbtxn_is_streamed(txn));
1498  }
1499 
1500  /*
1501  * Remove TXN from its containing list.
1502  *
1503  * Note: if txn is known as subxact, we are deleting the TXN from its
1504  * parent's list of known subxacts; this leaves the parent's nsubxacts
1505  * count too high, but we don't care. Otherwise, we are deleting the TXN
1506  * from the LSN-ordered list of toplevel TXNs.
1507  */
1508  dlist_delete(&txn->node);
1509 
1510  /* now remove reference from buffer */
1511  hash_search(rb->by_txn,
1512  (void *) &txn->xid,
1513  HASH_REMOVE,
1514  &found);
1515  Assert(found);
1516 
1517  /* remove entries spilled to disk */
1518  if (rbtxn_is_serialized(txn))
1519  ReorderBufferRestoreCleanup(rb, txn);
1520 
1521  /* deallocate */
1522  ReorderBufferReturnTXN(rb, txn);
1523 }
Snapshot base_snapshot
dlist_node base_snapshot_node
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:87
Snapshot snapshot_now
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
#define rbtxn_is_streamed(txn)
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
#define Assert(condition)
Definition: c.h:804
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:409
dlist_head subtxns
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
#define rbtxn_is_serialized(txn)
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
#define rbtxn_is_known_subxact(txn)
dlist_head tuplecids

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2578 of file reorderbuffer.c.

References InvalidXLogRecPtr, ReorderBufferReplay(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

2582 {
2583  ReorderBufferTXN *txn;
2584 
2585  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2586  false);
2587 
2588  /* unknown transaction, nothing to replay */
2589  if (txn == NULL)
2590  return;
2591 
2592  ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2593  origin_id, origin_lsn);
2594 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 1119 of file reorderbuffer.c.

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit(), and DecodePrepare().

1122 {
1123  ReorderBufferTXN *subtxn;
1124 
1125  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1126  InvalidXLogRecPtr, false);
1127 
1128  /*
1129  * No need to do anything if that subtxn didn't contain any changes
1130  */
1131  if (!subtxn)
1132  return;
1133 
1134  subtxn->final_lsn = commit_lsn;
1135  subtxn->end_lsn = end_lsn;
1136 
1137  /*
1138  * Assign this subxact as a child of the toplevel xact (no-op if already
1139  * done.)
1140  */
1141  ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1142 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr final_lsn
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
XLogRecPtr end_lsn
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferCopySnap()

static Snapshot ReorderBufferCopySnap ( ReorderBuffer rb,
Snapshot  orig_snap,
ReorderBufferTXN txn,
CommandId  cid 
)
static

Definition at line 1728 of file reorderbuffer.c.

References SnapshotData::active_count, ReorderBuffer::context, SnapshotData::copied, dlist_iter::cur, SnapshotData::curcid, dlist_container, dlist_foreach, i, MemoryContextAllocZero(), ReorderBufferTXN::nsubtxns, qsort, SnapshotData::regd_count, ReorderBufferTXN::subtxns, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::xcnt, ReorderBufferTXN::xid, xidComparator(), and SnapshotData::xip.

Referenced by ReorderBufferProcessTXN(), ReorderBufferSaveTXNSnapshot(), and ReorderBufferStreamTXN().

1730 {
1731  Snapshot snap;
1732  dlist_iter iter;
1733  int i = 0;
1734  Size size;
1735 
1736  size = sizeof(SnapshotData) +
1737  sizeof(TransactionId) * orig_snap->xcnt +
1738  sizeof(TransactionId) * (txn->nsubtxns + 1);
1739 
1740  snap = MemoryContextAllocZero(rb->context, size);
1741  memcpy(snap, orig_snap, sizeof(SnapshotData));
1742 
1743  snap->copied = true;
1744  snap->active_count = 1; /* mark as active so nobody frees it */
1745  snap->regd_count = 0;
1746  snap->xip = (TransactionId *) (snap + 1);
1747 
1748  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1749 
1750  /*
1751  * snap->subxip contains all txids that belong to our transaction which we
1752  * need to check via cmin/cmax. That's why we store the toplevel
1753  * transaction in there as well.
1754  */
1755  snap->subxip = snap->xip + snap->xcnt;
1756  snap->subxip[i++] = txn->xid;
1757 
1758  /*
1759  * subxcnt isn't decreased when subtransactions abort, so count manually.
1760  * Since it's an upper boundary it is safe to use it for the allocation
1761  * above.
1762  */
1763  snap->subxcnt = 1;
1764 
1765  dlist_foreach(iter, &txn->subtxns)
1766  {
1767  ReorderBufferTXN *sub_txn;
1768 
1769  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1770  snap->subxip[i++] = sub_txn->xid;
1771  snap->subxcnt++;
1772  }
1773 
1774  /* sort so we can bsearch() later */
1775  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1776 
1777  /* store the specified current CommandId */
1778  snap->curcid = cid;
1779 
1780  return snap;
1781 }
uint32 TransactionId
Definition: c.h:587
bool copied
Definition: snapshot.h:185
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
uint32 regd_count
Definition: snapshot.h:205
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
struct SnapshotData SnapshotData
TransactionId * xip
Definition: snapshot.h:168
MemoryContext context
TransactionId xid
dlist_node * cur
Definition: ilist.h:161
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:840
CommandId curcid
Definition: snapshot.h:187
size_t Size
Definition: c.h:540
dlist_head subtxns
uint32 xcnt
Definition: snapshot.h:169
int i
#define qsort(a, b, c, d)
Definition: port.h:504
TransactionId * subxip
Definition: snapshot.h:180
uint32 active_count
Definition: snapshot.h:204
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:136
int32 subxcnt
Definition: snapshot.h:181

◆ ReorderBufferExecuteInvalidations()

static void ReorderBufferExecuteInvalidations ( uint32  nmsgs,
SharedInvalidationMessage msgs 
)
static

Definition at line 3195 of file reorderbuffer.c.

References i, and LocalExecuteInvalidationMessage().

Referenced by ReorderBufferFinishPrepared(), and ReorderBufferProcessTXN().

3196 {
3197  int i;
3198 
3199  for (i = 0; i < nmsgs; i++)
3201 }
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:562
int i

◆ ReorderBufferFinishPrepared()

void ReorderBufferFinishPrepared ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
XLogRecPtr  initial_consistent_point,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn,
char *  gid,
bool  is_commit 
)

Definition at line 2673 of file reorderbuffer.c.

References Assert, ReorderBuffer::commit_prepared, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, pstrdup(), RBTXN_PREPARE, ReorderBufferCleanupTXN(), ReorderBufferExecuteInvalidations(), ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBuffer::rollback_prepared, ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::txn_flags.

Referenced by DecodeAbort(), and DecodeCommit().

2678 {
2679  ReorderBufferTXN *txn;
2680  XLogRecPtr prepare_end_lsn;
2681  TimestampTz prepare_time;
2682 
2683  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
2684 
2685  /* unknown transaction, nothing to do */
2686  if (txn == NULL)
2687  return;
2688 
2689  /*
2690  * By this time the txn has the prepare record information, remember it to
2691  * be later used for rollback.
2692  */
2693  prepare_end_lsn = txn->end_lsn;
2694  prepare_time = txn->commit_time;
2695 
2696  /* add the gid in the txn */
2697  txn->gid = pstrdup(gid);
2698 
2699  /*
2700  * It is possible that this transaction is not decoded at prepare time
2701  * either because by that time we didn't have a consistent snapshot or it
2702  * was decoded earlier but we have restarted. We only need to send the
2703  * prepare if it was not decoded earlier. We don't need to decode the xact
2704  * for aborts if it is not done already.
2705  */
2706  if ((txn->final_lsn < initial_consistent_point) && is_commit)
2707  {
2708  txn->txn_flags |= RBTXN_PREPARE;
2709 
2710  /*
2711  * The prepare info must have been updated in txn even if we skip
2712  * prepare.
2713  */
2715 
2716  /*
2717  * By this time the txn has the prepare record information and it is
2718  * important to use that so that downstream gets the accurate
2719  * information. If instead, we have passed commit information here
2720  * then downstream can behave as it has already replayed commit
2721  * prepared after the restart.
2722  */
2723  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2724  txn->commit_time, txn->origin_id, txn->origin_lsn);
2725  }
2726 
2727  txn->final_lsn = commit_lsn;
2728  txn->end_lsn = end_lsn;
2729  txn->commit_time = commit_time;
2730  txn->origin_id = origin_id;
2731  txn->origin_lsn = origin_lsn;
2732 
2733  if (is_commit)
2734  rb->commit_prepared(rb, txn, commit_lsn);
2735  else
2736  rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
2737 
2738  /* cleanup: make sure there's no cache pollution */
2740  txn->invalidations);
2741  ReorderBufferCleanupTXN(rb, txn);
2742 }
TimestampTz commit_time
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
RepOriginId origin_id
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1187
XLogRecPtr origin_lsn
XLogRecPtr final_lsn
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn
ReorderBufferRollbackPreparedCB rollback_prepared
SharedInvalidationMessage * invalidations
#define RBTXN_PREPARE
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
ReorderBufferCommitPreparedCB commit_prepared

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2842 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), ReorderBuffer::stream_abort, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

2843 {
2844  ReorderBufferTXN *txn;
2845 
2846  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2847  false);
2848 
2849  /* unknown, nothing to forget */
2850  if (txn == NULL)
2851  return;
2852 
2853  /* For streamed transactions notify the remote node about the abort. */
2854  if (rbtxn_is_streamed(txn))
2855  rb->stream_abort(rb, txn, lsn);
2856 
2857  /* cosmetic... */
2858  txn->final_lsn = lsn;
2859 
2860  /*
2861  * Process cache invalidation messages if there are any. Even if we're not
2862  * interested in the transaction's contents, it could have manipulated the
2863  * catalog and we need to update the caches according to that.
2864  */
2865  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2867  txn->invalidations);
2868  else
2869  Assert(txn->ninvalidations == 0);
2870 
2871  /* remove potential on-disk data, and deallocate */
2872  ReorderBufferCleanupTXN(rb, txn);
2873 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
ReorderBufferStreamAbortCB stream_abort
#define rbtxn_is_streamed(txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:804
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer rb)

Definition at line 373 of file reorderbuffer.c.

References ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

374 {
375  MemoryContext context = rb->context;
376 
377  /*
378  * We free separately allocated data by entirely scrapping reorderbuffer's
379  * memory context.
380  */
381  MemoryContextDelete(context);
382 
383  /* Free disk space used by unconsumed reorder buffers */
385 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:212
ReplicationSlotPersistentData data
Definition: slot.h:156
MemoryContext context
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NameStr(name)
Definition: c.h:681
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)

◆ ReorderBufferFreeSnap()

static void ReorderBufferFreeSnap ( ReorderBuffer rb,
Snapshot  snap 
)
static

Definition at line 1787 of file reorderbuffer.c.

References SnapshotData::copied, pfree(), and SnapBuildSnapDecRefcount().

Referenced by ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), ReorderBufferReturnChange(), and ReorderBufferStreamTXN().

1788 {
1789  if (snap->copied)
1790  pfree(snap);
1791  else
1793 }
bool copied
Definition: snapshot.h:185
void pfree(void *pointer)
Definition: mcxt.c:1057
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:409

◆ ReorderBufferGetChange()

ReorderBufferChange* ReorderBufferGetChange ( ReorderBuffer rb)

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer rb)

Definition at line 944 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessRunningXacts().

945 {
946  ReorderBufferTXN *txn;
947 
948  AssertTXNLsnOrder(rb);
949 
951  return NULL;
952 
954 
957  return txn;
958 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
dlist_head toplevel_by_lsn
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:804
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define rbtxn_is_known_subxact(txn)

◆ ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBuffer rb)

Definition at line 972 of file reorderbuffer.c.

References AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, dlist_head_element, dlist_is_empty(), InvalidTransactionId, ReorderBufferTXNByIdEnt::txn, ReorderBuffer::txns_by_base_snapshot_lsn, and SnapshotData::xmin.

Referenced by SnapBuildProcessRunningXacts().

973 {
974  ReorderBufferTXN *txn;
975 
976  AssertTXNLsnOrder(rb);
977 
979  return InvalidTransactionId;
980 
981  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
983  return txn->base_snapshot->xmin;
984 }
Snapshot base_snapshot
dlist_head txns_by_base_snapshot_lsn
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: snapshot.h:157
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289

◆ ReorderBufferGetRelids()

Oid* ReorderBufferGetRelids ( ReorderBuffer rb,
int  nrelids 
)

Definition at line 570 of file reorderbuffer.c.

References ReorderBuffer::context, and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

571 {
572  Oid *relids;
573  Size alloc_len;
574 
575  alloc_len = sizeof(Oid) * nrelids;
576 
577  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
578 
579  return relids;
580 }
unsigned int Oid
Definition: postgres_ext.h:31
MemoryContext context
size_t Size
Definition: c.h:540
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797

◆ ReorderBufferGetTupleBuf()

ReorderBufferTupleBuf* ReorderBufferGetTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)

Definition at line 534 of file reorderbuffer.c.

References ReorderBufferTupleBuf::alloc_tuple_size, MemoryContextAlloc(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, HeapTupleData::t_data, ReorderBuffer::tup_context, and ReorderBufferTupleBuf::tuple.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

535 {
536  ReorderBufferTupleBuf *tuple;
537  Size alloc_len;
538 
539  alloc_len = tuple_len + SizeofHeapTupleHeader;
540 
541  tuple = (ReorderBufferTupleBuf *)
543  sizeof(ReorderBufferTupleBuf) +
544  MAXIMUM_ALIGNOF + alloc_len);
545  tuple->alloc_tuple_size = alloc_len;
546  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
547 
548  return tuple;
549 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
HeapTupleHeader t_data
Definition: htup.h:68
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:38
HeapTupleData tuple
Definition: reorderbuffer.h:29
size_t Size
Definition: c.h:540
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797
MemoryContext tup_context

◆ ReorderBufferGetTXN()

static ReorderBufferTXN * ReorderBufferGetTXN ( ReorderBuffer rb)
static

Definition at line 391 of file reorderbuffer.c.

References ReorderBufferTXN::changes, ReorderBufferTXN::command_id, dlist_init(), InvalidCommandId, MemoryContextAlloc(), ReorderBufferTXN::output_plugin_private, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferTXNByIdEnt::txn, and ReorderBuffer::txn_context.

Referenced by ReorderBufferTXNByXid().

392 {
393  ReorderBufferTXN *txn;
394 
395  txn = (ReorderBufferTXN *)
397 
398  memset(txn, 0, sizeof(ReorderBufferTXN));
399 
400  dlist_init(&txn->changes);
401  dlist_init(&txn->tuplecids);
402  dlist_init(&txn->subtxns);
403 
404  /* InvalidCommandId is not zero, so set it explicitly */
406  txn->output_plugin_private = NULL;
407 
408  return txn;
409 }
CommandId command_id
dlist_head changes
#define InvalidCommandId
Definition: c.h:604
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
dlist_head subtxns
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797
void * output_plugin_private
MemoryContext txn_context
dlist_head tuplecids

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 2915 of file reorderbuffer.c.

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by DecodeXactOp(), ReorderBufferAbort(), ReorderBufferForget(), and ReorderBufferInvalidate().

2917 {
2918  bool use_subtxn = IsTransactionOrTransactionBlock();
2919  int i;
2920 
2921  if (use_subtxn)
2922  BeginInternalSubTransaction("replay");
2923 
2924  /*
2925  * Force invalidations to happen outside of a valid transaction - that way
2926  * entries will just be marked as invalid without accessing the catalog.
2927  * That's advantageous because we don't need to setup the full state
2928  * necessary for catalog access.
2929  */
2930  if (use_subtxn)
2932 
2933  for (i = 0; i < ninvalidations; i++)
2934  LocalExecuteInvalidationMessage(&invalidations[i]);
2935 
2936  if (use_subtxn)
2938 }
void AbortCurrentTransaction(void)
Definition: xact.c:3210
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4701
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4512
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4407
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:562
int i

◆ ReorderBufferInvalidate()

void ReorderBufferInvalidate ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2884 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodePrepare().

2885 {
2886  ReorderBufferTXN *txn;
2887 
2888  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2889  false);
2890 
2891  /* unknown, nothing to do */
2892  if (txn == NULL)
2893  return;
2894 
2895  /*
2896  * Process cache invalidation messages if there are any. Even if we're not
2897  * interested in the transaction's contents, it could have manipulated the
2898  * catalog and we need to update the caches according to that.
2899  */
2900  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2902  txn->invalidations);
2903  else
2904  Assert(txn->ninvalidations == 0);
2905 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
#define Assert(condition)
Definition: c.h:804
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferIterCompare()

static int ReorderBufferIterCompare ( Datum  a,
Datum  b,
void *  arg 
)
static

Definition at line 1161 of file reorderbuffer.c.

References DatumGetInt32, ReorderBufferIterTXNState::entries, and ReorderBufferIterTXNEntry::lsn.

Referenced by ReorderBufferIterTXNInit().

1162 {
1164  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
1165  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
1166 
1167  if (pos_a < pos_b)
1168  return 1;
1169  else if (pos_a == pos_b)
1170  return 0;
1171  return -1;
1172 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
#define DatumGetInt32(X)
Definition: postgres.h:472
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:317
void * arg

◆ ReorderBufferIterTXNFinish()

static void ReorderBufferIterTXNFinish ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1398 of file reorderbuffer.c.

References Assert, binaryheap_free(), dlist_container, dlist_is_empty(), dlist_pop_head_node(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::file, FileClose(), ReorderBufferIterTXNState::heap, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, pfree(), ReorderBufferReturnChange(), and TXNEntryFile::vfd.

Referenced by ReorderBufferProcessTXN().

1400 {
1401  int32 off;
1402 
1403  for (off = 0; off < state->nr_txns; off++)
1404  {
1405  if (state->entries[off].file.vfd != -1)
1406  FileClose(state->entries[off].file.vfd);
1407  }
1408 
1409  /* free memory we might have "leaked" in the last *Next call */
1410  if (!dlist_is_empty(&state->old_change))
1411  {
1412  ReorderBufferChange *change;
1413 
1414  change = dlist_container(ReorderBufferChange, node,
1415  dlist_pop_head_node(&state->old_change));
1416  ReorderBufferReturnChange(rb, change, true);
1417  Assert(dlist_is_empty(&state->old_change));
1418  }
1419 
1420  binaryheap_free(state->heap);
1421  pfree(state);
1422 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
signed int int32
Definition: c.h:429
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:1057
void FileClose(File file)
Definition: fd.c:1854
#define Assert(condition)
Definition: c.h:804
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:69
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)

◆ ReorderBufferIterTXNInit()

static void ReorderBufferIterTXNInit ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferIterTXNState *volatile *  iter_state 
)
static

Definition at line 1184 of file reorderbuffer.c.

References AssertChangeLsnOrder(), binaryheap_add_unordered(), binaryheap_allocate(), binaryheap_build(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, ReorderBuffer::context, dlist_iter::cur, dlist_container, dlist_foreach, dlist_head_element, dlist_init(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::file, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, MemoryContextAllocZero(), ReorderBufferTXN::nentries, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, rbtxn_is_serialized, ReorderBufferIterCompare(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferIterTXNEntry::segno, ReorderBufferTXN::subtxns, ReorderBufferTXNByIdEnt::txn, ReorderBufferIterTXNEntry::txn, and TXNEntryFile::vfd.

Referenced by ReorderBufferProcessTXN().

1186 {
1187  Size nr_txns = 0;
1189  dlist_iter cur_txn_i;
1190  int32 off;
1191 
1192  *iter_state = NULL;
1193 
1194  /* Check ordering of changes in the toplevel transaction. */
1195  AssertChangeLsnOrder(txn);
1196 
1197  /*
1198  * Calculate the size of our heap: one element for every transaction that
1199  * contains changes. (Besides the transactions already in the reorder
1200  * buffer, we count the one we were directly passed.)
1201  */
1202  if (txn->nentries > 0)
1203  nr_txns++;
1204 
1205  dlist_foreach(cur_txn_i, &txn->subtxns)
1206  {
1207  ReorderBufferTXN *cur_txn;
1208 
1209  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1210 
1211  /* Check ordering of changes in this subtransaction. */
1212  AssertChangeLsnOrder(cur_txn);
1213 
1214  if (cur_txn->nentries > 0)
1215  nr_txns++;
1216  }
1217 
1218  /* allocate iteration state */
1219  state = (ReorderBufferIterTXNState *)
1221  sizeof(ReorderBufferIterTXNState) +
1222  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1223 
1224  state->nr_txns = nr_txns;
1225  dlist_init(&state->old_change);
1226 
1227  for (off = 0; off < state->nr_txns; off++)
1228  {
1229  state->entries[off].file.vfd = -1;
1230  state->entries[off].segno = 0;
1231  }
1232 
1233  /* allocate heap */
1234  state->heap = binaryheap_allocate(state->nr_txns,
1236  state);
1237 
1238  /* Now that the state fields are initialized, it is safe to return it. */
1239  *iter_state = state;
1240 
1241  /*
1242  * Now insert items into the binary heap, in an unordered fashion. (We
1243  * will run a heap assembly step at the end; this is more efficient.)
1244  */
1245 
1246  off = 0;
1247 
1248  /* add toplevel transaction if it contains changes */
1249  if (txn->nentries > 0)
1250  {
1251  ReorderBufferChange *cur_change;
1252 
1253  if (rbtxn_is_serialized(txn))
1254  {
1255  /* serialize remaining changes */
1256  ReorderBufferSerializeTXN(rb, txn);
1257  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1258  &state->entries[off].segno);
1259  }
1260 
1261  cur_change = dlist_head_element(ReorderBufferChange, node,
1262  &txn->changes);
1263 
1264  state->entries[off].lsn = cur_change->lsn;
1265  state->entries[off].change = cur_change;
1266  state->entries[off].txn = txn;
1267 
1269  }
1270 
1271  /* add subtransactions if they contain changes */
1272  dlist_foreach(cur_txn_i, &txn->subtxns)
1273  {
1274  ReorderBufferTXN *cur_txn;
1275 
1276  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1277 
1278  if (cur_txn->nentries > 0)
1279  {
1280  ReorderBufferChange *cur_change;
1281 
1282  if (rbtxn_is_serialized(cur_txn))
1283  {
1284  /* serialize remaining changes */
1285  ReorderBufferSerializeTXN(rb, cur_txn);
1286  ReorderBufferRestoreChanges(rb, cur_txn,
1287  &state->entries[off].file,
1288  &state->entries[off].segno);
1289  }
1290  cur_change = dlist_head_element(ReorderBufferChange, node,
1291  &cur_txn->changes);
1292 
1293  state->entries[off].lsn = cur_change->lsn;
1294  state->entries[off].change = cur_change;
1295  state->entries[off].txn = cur_txn;
1296 
1298  }
1299  }
1300 
1301  /* assemble a valid binary heap */
1302  binaryheap_build(state->heap);
1303 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
ReorderBufferTXN * txn
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
void binaryheap_add_unordered(binaryheap *heap, Datum d)
Definition: binaryheap.c:110
signed int int32
Definition: c.h:429
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
ReorderBufferChange * change
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
MemoryContext context
dlist_node * cur
Definition: ilist.h:161
static void AssertChangeLsnOrder(ReorderBufferTXN *txn)
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:840
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:126
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
Definition: regguts.h:317
size_t Size
Definition: c.h:540
dlist_head subtxns
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:33
#define Int32GetDatum(X)
Definition: postgres.h:479
#define rbtxn_is_serialized(txn)

◆ ReorderBufferIterTXNNext()

static ReorderBufferChange * ReorderBufferIterTXNNext ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1312 of file reorderbuffer.c.

References Assert, binaryheap::bh_size, binaryheap_first(), binaryheap_remove_first(), binaryheap_replace_first(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, DatumGetInt32, DEBUG2, dlist_container, dlist_delete(), dlist_has_next(), dlist_head_element, dlist_is_empty(), dlist_next_node(), dlist_pop_head_node(), dlist_push_tail(), elog, ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::file, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, next, ReorderBufferChange::node, ReorderBufferIterTXNState::old_change, ReorderBufferRestoreChanges(), ReorderBufferReturnChange(), ReorderBufferIterTXNEntry::segno, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferProcessTXN().

1313 {
1314  ReorderBufferChange *change;
1316  int32 off;
1317 
1318  /* nothing there anymore */
1319  if (state->heap->bh_size == 0)
1320  return NULL;
1321 
1322  off = DatumGetInt32(binaryheap_first(state->heap));
1323  entry = &state->entries[off];
1324 
1325  /* free memory we might have "leaked" in the previous *Next call */
1326  if (!dlist_is_empty(&state->old_change))
1327  {
1328  change = dlist_container(ReorderBufferChange, node,
1329  dlist_pop_head_node(&state->old_change));
1330  ReorderBufferReturnChange(rb, change, true);
1331  Assert(dlist_is_empty(&state->old_change));
1332  }
1333 
1334  change = entry->change;
1335 
1336  /*
1337  * update heap with information about which transaction has the next
1338  * relevant change in LSN order
1339  */
1340 
1341  /* there are in-memory changes */
1342  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1343  {
1344  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1345  ReorderBufferChange *next_change =
1346  dlist_container(ReorderBufferChange, node, next);
1347 
1348  /* txn stays the same */
1349  state->entries[off].lsn = next_change->lsn;
1350  state->entries[off].change = next_change;
1351 
1353  return change;
1354  }
1355 
1356  /* try to load changes from disk */
1357  if (entry->txn->nentries != entry->txn->nentries_mem)
1358  {
1359  /*
1360  * Ugly: restoring changes will reuse *Change records, thus delete the
1361  * current one from the per-tx list and only free in the next call.
1362  */
1363  dlist_delete(&change->node);
1364  dlist_push_tail(&state->old_change, &change->node);
1365 
1366  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1367  &state->entries[off].segno))
1368  {
1369  /* successfully restored changes from disk */
1370  ReorderBufferChange *next_change =
1372  &entry->txn->changes);
1373 
1374  elog(DEBUG2, "restored %u/%u changes from disk",
1375  (uint32) entry->txn->nentries_mem,
1376  (uint32) entry->txn->nentries);
1377 
1378  Assert(entry->txn->nentries_mem);
1379  /* txn stays the same */
1380  state->entries[off].lsn = next_change->lsn;
1381  state->entries[off].change = next_change;
1383 
1384  return change;
1385  }
1386  }
1387 
1388  /* ok, no changes there anymore, remove */
1389  binaryheap_remove_first(state->heap);
1390 
1391  return change;
1392 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
static int32 next
Definition: blutils.c:219
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
#define DatumGetInt32(X)
Definition: postgres.h:472
ReorderBufferTXN * txn
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void binaryheap_replace_first(binaryheap *heap, Datum d)
Definition: binaryheap.c:204
signed int int32
Definition: c.h:429
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:421
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
Datum binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:159
#define DEBUG2
Definition: elog.h:24
unsigned int uint32
Definition: c.h:441
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
int bh_size
Definition: binaryheap.h:32
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
ReorderBufferChange * change
static bool dlist_has_next(dlist_head *head, dlist_node *node)
Definition: ilist.h:402
#define Assert(condition)
Definition: c.h:804
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define Int32GetDatum(X)
Definition: postgres.h:479
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
#define elog(elevel,...)
Definition: elog.h:227
Datum binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:174
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)

◆ ReorderBufferLargestTopTXN()

static ReorderBufferTXN* ReorderBufferLargestTopTXN ( ReorderBuffer rb)
static

Definition at line 3348 of file reorderbuffer.c.

References dlist_iter::cur, dlist_container, dlist_foreach, rbtxn_has_incomplete_tuple, ReorderBuffer::toplevel_by_lsn, ReorderBufferTXN::total_size, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferCheckMemoryLimit().

3349 {
3350  dlist_iter iter;
3351  Size largest_size = 0;
3352  ReorderBufferTXN *largest = NULL;
3353 
3354  /* Find the largest top-level transaction. */
3355  dlist_foreach(iter, &rb->toplevel_by_lsn)
3356  {
3357  ReorderBufferTXN *txn;
3358 
3359  txn = dlist_container(ReorderBufferTXN, node, iter.cur);
3360 
3361  if ((largest != NULL || txn->total_size > largest_size) &&
3362  (txn->total_size > 0) && !(rbtxn_has_incomplete_tuple(txn)))
3363  {
3364  largest = txn;
3365  largest_size = txn->total_size;
3366  }
3367  }
3368 
3369  return largest;
3370 }
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
#define rbtxn_has_incomplete_tuple(txn)
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head toplevel_by_lsn
dlist_node * cur
Definition: ilist.h:161
size_t Size
Definition: c.h:540

◆ ReorderBufferLargestTXN()

static ReorderBufferTXN* ReorderBufferLargestTXN ( ReorderBuffer rb)
static

Definition at line 3304 of file reorderbuffer.c.

References Assert, ReorderBuffer::by_txn, hash_seq_init(), hash_seq_search(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferCheckMemoryLimit().

3305 {
3306  HASH_SEQ_STATUS hash_seq;
3308  ReorderBufferTXN *largest = NULL;
3309 
3310  hash_seq_init(&hash_seq, rb->by_txn);
3311  while ((ent = hash_seq_search(&hash_seq)) != NULL)
3312  {
3313  ReorderBufferTXN *txn = ent->txn;
3314 
3315  /* if the current transaction is larger, remember it */
3316  if ((!largest) || (txn->size > largest->size))
3317  largest = txn;
3318  }
3319 
3320  Assert(largest);
3321  Assert(largest->size > 0);
3322  Assert(largest->size <= rb->size);
3323 
3324  return largest;
3325 }
#define Assert(condition)
Definition: c.h:804
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
ReorderBufferTXN * txn

◆ ReorderBufferPrepare()

void ReorderBufferPrepare ( ReorderBuffer rb,
TransactionId  xid,
char *  gid 
)

Definition at line 2647 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, pstrdup(), RBTXN_PREPARE, ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::txn_flags.

Referenced by DecodePrepare().

2649 {
2650  ReorderBufferTXN *txn;
2651 
2652  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2653  false);
2654 
2655  /* unknown transaction, nothing to replay */
2656  if (txn == NULL)
2657  return;
2658 
2659  txn->txn_flags |= RBTXN_PREPARE;
2660  txn->gid = pstrdup(gid);
2661 
2662  /* The prepare info must have been updated in txn by now. */
2664 
2665  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2666  txn->commit_time, txn->origin_id, txn->origin_lsn);
2667 }
TimestampTz commit_time
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
RepOriginId origin_id
char * pstrdup(const char *in)
Definition: mcxt.c:1187
XLogRecPtr origin_lsn
XLogRecPtr final_lsn
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn
#define RBTXN_PREPARE
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferProcessPartialChange()

static void ReorderBufferProcessPartialChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  toast_insert 
)
static

Definition at line 686 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, IsInsertOrUpdate, IsSpecConfirm, IsSpecInsert, rbtxn_has_incomplete_tuple, RBTXN_HAS_SPEC_INSERT, rbtxn_has_spec_insert, RBTXN_HAS_TOAST_INSERT, rbtxn_has_toast_insert, rbtxn_is_serialized, ReorderBufferCanStartStreaming(), ReorderBufferCanStream(), ReorderBufferStreamTXN(), ReorderBufferTXN::toptxn, ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::txn_flags.

Referenced by ReorderBufferQueueChange().

689 {
690  ReorderBufferTXN *toptxn;
691 
692  /*
693  * The partial changes need to be processed only while streaming
694  * in-progress transactions.
695  */
696  if (!ReorderBufferCanStream(rb))
697  return;
698 
699  /* Get the top transaction. */
700  if (txn->toptxn != NULL)
701  toptxn = txn->toptxn;
702  else
703  toptxn = txn;
704 
705  /*
706  * Set the toast insert bit whenever we get toast insert to indicate a
707  * partial change and clear it when we get the insert or update on main
708  * table (Both update and insert will do the insert in the toast table).
709  */
710  if (toast_insert)
712  else if (rbtxn_has_toast_insert(toptxn) &&
713  IsInsertOrUpdate(change->action))
714  toptxn->txn_flags &= ~RBTXN_HAS_TOAST_INSERT;
715 
716  /*
717  * Set the spec insert bit whenever we get the speculative insert to
718  * indicate the partial change and clear the same on speculative confirm.
719  */
720  if (IsSpecInsert(change->action))
721  toptxn->txn_flags |= RBTXN_HAS_SPEC_INSERT;
722  else if (IsSpecConfirm(change->action))
723  {
724  /*
725  * Speculative confirm change must be preceded by speculative
726  * insertion.
727  */
728  Assert(rbtxn_has_spec_insert(toptxn));
729  toptxn->txn_flags &= ~RBTXN_HAS_SPEC_INSERT;
730  }
731 
732  /*
733  * Stream the transaction if it is serialized before and the changes are
734  * now complete in the top-level transaction.
735  *
736  * The reason for doing the streaming of such a transaction as soon as we
737  * get the complete change for it is that previously it would have reached
738  * the memory threshold and wouldn't get streamed because of incomplete
739  * changes. Delaying such transactions would increase apply lag for them.
740  */
742  !(rbtxn_has_incomplete_tuple(toptxn)) &&
743  rbtxn_is_serialized(txn))
744  ReorderBufferStreamTXN(rb, toptxn);
745 }
#define RBTXN_HAS_TOAST_INSERT
#define rbtxn_has_incomplete_tuple(txn)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define rbtxn_has_toast_insert(txn)
#define IsSpecInsert(action)
static bool ReorderBufferCanStream(ReorderBuffer *rb)
struct ReorderBufferTXN * toptxn
#define rbtxn_has_spec_insert(txn)
#define Assert(condition)
Definition: c.h:804
#define IsSpecConfirm(action)
#define IsInsertOrUpdate(action)
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)
#define RBTXN_HAS_SPEC_INSERT
#define rbtxn_is_serialized(txn)

◆ ReorderBufferProcessTXN()

static void ReorderBufferProcessTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn,
volatile Snapshot  snapshot_now,
volatile CommandId  command_id,
bool  streaming 
)
static

Definition at line 2001 of file reorderbuffer.c.

References AbortCurrentTransaction(), ReorderBufferChange::action, Assert, ReorderBuffer::begin, ReorderBuffer::begin_prepare, BeginInternalSubTransaction(), CheckXidAlive, ReorderBufferChange::command_id, ReorderBuffer::commit, ReorderBufferTXN::concurrent_abort, SnapshotData::copied, CopyErrorData(), SnapshotData::curcid, CurrentMemoryContext, ReorderBufferChange::data, dlist_delete(), elog, ERROR, FlushErrorState(), FreeErrorData(), GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), i, ReorderBufferChange::inval, ReorderBufferTXN::invalidations, InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, ReorderBufferChange::node, ReorderBufferChange::origin_id, ReorderBufferTXN::origin_id, ReorderBuffer::output_rewrites, palloc0(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, ReorderBuffer::prepare, rbtxn_prepared, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelationIsValid, RelidByRelfilenode(), relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferApplyChange(), ReorderBufferApplyMessage(), ReorderBufferApplyTruncate(), ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferResetTXN(), ReorderBufferReturnChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), RollbackAndReleaseCurrentSubTransaction(), SetupCheckXidLive(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, ErrorData::sqlerrcode, StartTransactionCommand(), ReorderBuffer::stream_start, ReorderBuffer::stream_stop, TeardownHistoricSnapshot(), ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTXN::tuplecid_hash, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferReplay(), and ReorderBufferStreamTXN().

2006 {
2007  bool using_subtxn;
2009  ReorderBufferIterTXNState *volatile iterstate = NULL;
2010  volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
2011  ReorderBufferChange *volatile specinsert = NULL;
2012  volatile bool stream_started = false;
2013  ReorderBufferTXN *volatile curtxn = NULL;
2014 
2015  /* build data to be able to lookup the CommandIds of catalog tuples */
2017 
2018  /* setup the initial snapshot */
2019  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2020 
2021  /*
2022  * Decoding needs access to syscaches et al., which in turn use
2023  * heavyweight locks and such. Thus we need to have enough state around to
2024  * keep track of those. The easiest way is to simply use a transaction
2025  * internally. That also allows us to easily enforce that nothing writes
2026  * to the database by checking for xid assignments.
2027  *
2028  * When we're called via the SQL SRF there's already a transaction
2029  * started, so start an explicit subtransaction there.
2030  */
2031  using_subtxn = IsTransactionOrTransactionBlock();
2032 
2033  PG_TRY();
2034  {
2035  ReorderBufferChange *change;
2036 
2037  if (using_subtxn)
2038  BeginInternalSubTransaction(streaming ? "stream" : "replay");
2039  else
2041 
2042  /*
2043  * We only need to send begin/begin-prepare for non-streamed
2044  * transactions.
2045  */
2046  if (!streaming)
2047  {
2048  if (rbtxn_prepared(txn))
2049  rb->begin_prepare(rb, txn);
2050  else
2051  rb->begin(rb, txn);
2052  }
2053 
2054  ReorderBufferIterTXNInit(rb, txn, &iterstate);
2055  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
2056  {
2057  Relation relation = NULL;
2058  Oid reloid;
2059 
2060  /*
2061  * We can't call start stream callback before processing first
2062  * change.
2063  */
2064  if (prev_lsn == InvalidXLogRecPtr)
2065  {
2066  if (streaming)
2067  {
2068  txn->origin_id = change->origin_id;
2069  rb->stream_start(rb, txn, change->lsn);
2070  stream_started = true;
2071  }
2072  }
2073 
2074  /*
2075  * Enforce correct ordering of changes, merged from multiple
2076  * subtransactions. The changes may have the same LSN due to
2077  * MULTI_INSERT xlog records.
2078  */
2079  Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
2080 
2081  prev_lsn = change->lsn;
2082 
2083  /*
2084  * Set the current xid to detect concurrent aborts. This is
2085  * required for the cases when we decode the changes before the
2086  * COMMIT record is processed.
2087  */
2088  if (streaming || rbtxn_prepared(change->txn))
2089  {
2090  curtxn = change->txn;
2091  SetupCheckXidLive(curtxn->xid);
2092  }
2093 
2094  switch (change->action)
2095  {
2097 
2098  /*
2099  * Confirmation for speculative insertion arrived. Simply
2100  * use as a normal record. It'll be cleaned up at the end
2101  * of INSERT processing.
2102  */
2103  if (specinsert == NULL)
2104  elog(ERROR, "invalid ordering of speculative insertion changes");
2105  Assert(specinsert->data.tp.oldtuple == NULL);
2106  change = specinsert;
2108 
2109  /* intentionally fall through */
2113  Assert(snapshot_now);
2114 
2115  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
2116  change->data.tp.relnode.relNode);
2117 
2118  /*
2119  * Mapped catalog tuple without data, emitted while
2120  * catalog table was in the process of being rewritten. We
2121  * can fail to look up the relfilenode, because the
2122  * relmapper has no "historic" view, in contrast to the
2123  * normal catalog during decoding. Thus repeated rewrites
2124  * can cause a lookup failure. That's OK because we do not
2125  * decode catalog changes anyway. Normally such tuples
2126  * would be skipped over below, but we can't identify
2127  * whether the table should be logically logged without
2128  * mapping the relfilenode to the oid.
2129  */
2130  if (reloid == InvalidOid &&
2131  change->data.tp.newtuple == NULL &&
2132  change->data.tp.oldtuple == NULL)
2133  goto change_done;
2134  else if (reloid == InvalidOid)
2135  elog(ERROR, "could not map filenode \"%s\" to relation OID",
2136  relpathperm(change->data.tp.relnode,
2137  MAIN_FORKNUM));
2138 
2139  relation = RelationIdGetRelation(reloid);
2140 
2141  if (!RelationIsValid(relation))
2142  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
2143  reloid,
2144  relpathperm(change->data.tp.relnode,
2145  MAIN_FORKNUM));
2146 
2147  if (!RelationIsLogicallyLogged(relation))
2148  goto change_done;
2149 
2150  /*
2151  * Ignore temporary heaps created during DDL unless the
2152  * plugin has asked for them.
2153  */
2154  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2155  goto change_done;
2156 
2157  /*
2158  * For now ignore sequence changes entirely. Most of the
2159  * time they don't log changes using records we
2160  * understand, so it doesn't make sense to handle the few
2161  * cases we do.
2162  */
2163  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2164  goto change_done;
2165 
2166  /* user-triggered change */
2167  if (!IsToastRelation(relation))
2168  {
2169  ReorderBufferToastReplace(rb, txn, relation, change);
2170  ReorderBufferApplyChange(rb, txn, relation, change,
2171  streaming);
2172 
2173  /*
2174  * Only clear reassembled toast chunks if we're sure
2175  * they're not required anymore. The creator of the
2176  * tuple tells us.
2177  */
2178  if (change->data.tp.clear_toast_afterwards)
2179  ReorderBufferToastReset(rb, txn);
2180  }
2181  /* we're not interested in toast deletions */
2182  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2183  {
2184  /*
2185  * Need to reassemble the full toasted Datum in
2186  * memory, to ensure the chunks don't get reused till
2187  * we're done remove it from the list of this
2188  * transaction's changes. Otherwise it will get
2189  * freed/reused while restoring spooled data from
2190  * disk.
2191  */
2192  Assert(change->data.tp.newtuple != NULL);
2193 
2194  dlist_delete(&change->node);
2195  ReorderBufferToastAppendChunk(rb, txn, relation,
2196  change);
2197  }
2198 
2199  change_done:
2200 
2201  /*
2202  * Either speculative insertion was confirmed, or it was
2203  * unsuccessful and the record isn't needed anymore.
2204  */
2205  if (specinsert != NULL)
2206  {
2207  ReorderBufferReturnChange(rb, specinsert, true);
2208  specinsert = NULL;
2209  }
2210 
2211  if (RelationIsValid(relation))
2212  {
2213  RelationClose(relation);
2214  relation = NULL;
2215  }
2216  break;
2217 
2219 
2220  /*
2221  * Speculative insertions are dealt with by delaying the
2222  * processing of the insert until the confirmation record
2223  * arrives. For that we simply unlink the record from the
2224  * chain, so it does not get freed/reused while restoring
2225  * spooled data from disk.
2226  *
2227  * This is safe in the face of concurrent catalog changes
2228  * because the relevant relation can't be changed between
2229  * speculative insertion and confirmation due to
2230  * CheckTableNotInUse() and locking.
2231  */
2232 
2233  /* clear out a pending (and thus failed) speculation */
2234  if (specinsert != NULL)
2235  {
2236  ReorderBufferReturnChange(rb, specinsert, true);
2237  specinsert = NULL;
2238  }
2239 
2240  /* and memorize the pending insertion */
2241  dlist_delete(&change->node);
2242  specinsert = change;
2243  break;
2244 
2246  {
2247  int i;
2248  int nrelids = change->data.truncate.nrelids;
2249  int nrelations = 0;
2250  Relation *relations;
2251 
2252  relations = palloc0(nrelids * sizeof(Relation));
2253  for (i = 0; i < nrelids; i++)
2254  {
2255  Oid relid = change->data.truncate.relids[i];
2256  Relation relation;
2257 
2258  relation = RelationIdGetRelation(relid);
2259 
2260  if (!RelationIsValid(relation))
2261  elog(ERROR, "could not open relation with OID %u", relid);
2262 
2263  if (!RelationIsLogicallyLogged(relation))
2264  continue;
2265 
2266  relations[nrelations++] = relation;
2267  }
2268 
2269  /* Apply the truncate. */
2270  ReorderBufferApplyTruncate(rb, txn, nrelations,
2271  relations, change,
2272  streaming);
2273 
2274  for (i = 0; i < nrelations; i++)
2275  RelationClose(relations[i]);
2276 
2277  break;
2278  }
2279 
2281  ReorderBufferApplyMessage(rb, txn, change, streaming);
2282  break;
2283 
2285  /* Execute the invalidation messages locally */
2287  change->data.inval.ninvalidations,
2288  change->data.inval.invalidations);
2289  break;
2290 
2292  /* get rid of the old */
2293  TeardownHistoricSnapshot(false);
2294 
2295  if (snapshot_now->copied)
2296  {
2297  ReorderBufferFreeSnap(rb, snapshot_now);
2298  snapshot_now =
2299  ReorderBufferCopySnap(rb, change->data.snapshot,
2300  txn, command_id);
2301  }
2302 
2303  /*
2304  * Restored from disk, need to be careful not to double
2305  * free. We could introduce refcounting for that, but for
2306  * now this seems infrequent enough not to care.
2307  */
2308  else if (change->data.snapshot->copied)
2309  {
2310  snapshot_now =
2311  ReorderBufferCopySnap(rb, change->data.snapshot,
2312  txn, command_id);
2313  }
2314  else
2315  {
2316  snapshot_now = change->data.snapshot;
2317  }
2318 
2319  /* and continue with the new one */
2320  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2321  break;
2322 
2324  Assert(change->data.command_id != InvalidCommandId);
2325 
2326  if (command_id < change->data.command_id)
2327  {
2328  command_id = change->data.command_id;
2329 
2330  if (!snapshot_now->copied)
2331  {
2332  /* we don't use the global one anymore */
2333  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2334  txn, command_id);
2335  }
2336 
2337  snapshot_now->curcid = command_id;
2338 
2339  TeardownHistoricSnapshot(false);
2340  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2341  }
2342 
2343  break;
2344 
2346  elog(ERROR, "tuplecid value in changequeue");
2347  break;
2348  }
2349  }
2350 
2351  /*
2352  * There's a speculative insertion remaining, just clean in up, it
2353  * can't have been successful, otherwise we'd gotten a confirmation
2354  * record.
2355  */
2356  if (specinsert)
2357  {
2358  ReorderBufferReturnChange(rb, specinsert, true);
2359  specinsert = NULL;
2360  }
2361 
2362  /* clean up the iterator */
2363  ReorderBufferIterTXNFinish(rb, iterstate);
2364  iterstate = NULL;
2365 
2366  /*
2367  * Done with current changes, send the last message for this set of
2368  * changes depending upon streaming mode.
2369  */
2370  if (streaming)
2371  {
2372  if (stream_started)
2373  {
2374  rb->stream_stop(rb, txn, prev_lsn);
2375  stream_started = false;
2376  }
2377  }
2378  else
2379  {
2380  /*
2381  * Call either PREPARE (for two-phase transactions) or COMMIT (for
2382  * regular ones).
2383  */
2384  if (rbtxn_prepared(txn))
2385  rb->prepare(rb, txn, commit_lsn);
2386  else
2387  rb->commit(rb, txn, commit_lsn);
2388  }
2389 
2390  /* this is just a sanity check against bad output plugin behaviour */
2392  elog(ERROR, "output plugin used XID %u",
2394 
2395  /*
2396  * Remember the command ID and snapshot for the next set of changes in
2397  * streaming mode.
2398  */
2399  if (streaming)
2400  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2401  else if (snapshot_now->copied)
2402  ReorderBufferFreeSnap(rb, snapshot_now);
2403 
2404  /* cleanup */
2405  TeardownHistoricSnapshot(false);
2406 
2407  /*
2408  * Aborting the current (sub-)transaction as a whole has the right
2409  * semantics. We want all locks acquired in here to be released, not
2410  * reassigned to the parent and we do not want any database access
2411  * have persistent effects.
2412  */
2414 
2415  /* make sure there's no cache pollution */
2417 
2418  if (using_subtxn)
2420 
2421  /*
2422  * We are here due to one of the four reasons: 1. Decoding an
2423  * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
2424  * prepared txn that was (partially) streamed. 4. Decoding a committed
2425  * txn.
2426  *
2427  * For 1, we allow truncation of txn data by removing the changes
2428  * already streamed but still keeping other things like invalidations,
2429  * snapshot, and tuplecids. For 2 and 3, we indicate
2430  * ReorderBufferTruncateTXN to do more elaborate truncation of txn
2431  * data as the entire transaction has been decoded except for commit.
2432  * For 4, as the entire txn has been decoded, we can fully clean up
2433  * the TXN reorder buffer.
2434  */
2435  if (streaming || rbtxn_prepared(txn))
2436  {
2438  /* Reset the CheckXidAlive */
2440  }
2441  else
2442  ReorderBufferCleanupTXN(rb, txn);
2443  }
2444  PG_CATCH();
2445  {
2446  MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
2447  ErrorData *errdata = CopyErrorData();
2448 
2449  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
2450  if (iterstate)
2451  ReorderBufferIterTXNFinish(rb, iterstate);
2452 
2454 
2455  /*
2456  * Force cache invalidation to happen outside of a valid transaction
2457  * to prevent catalog access as we just caught an error.
2458  */
2460 
2461  /* make sure there's no cache pollution */
2463  txn->invalidations);
2464 
2465  if (using_subtxn)
2467 
2468  /*
2469  * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2470  * abort of the (sub)transaction we are streaming or preparing. We
2471  * need to do the cleanup and return gracefully on this error, see
2472  * SetupCheckXidLive.
2473  */
2474  if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK)
2475  {
2476  /*
2477  * This error can occur either when we are sending the data in
2478  * streaming mode and the streaming is not finished yet or when we
2479  * are sending the data out on a PREPARE during a two-phase
2480  * commit.
2481  */
2482  Assert(streaming || rbtxn_prepared(txn));
2483  Assert(stream_started || rbtxn_prepared(txn));
2484 
2485  /* Cleanup the temporary error state. */
2486  FlushErrorState();
2487  FreeErrorData(errdata);
2488  errdata = NULL;
2489  curtxn->concurrent_abort = true;
2490 
2491  /* Reset the TXN so that it is allowed to stream remaining data. */
2492  ReorderBufferResetTXN(rb, txn, snapshot_now,
2493  command_id, prev_lsn,
2494  specinsert);
2495  }
2496  else
2497  {
2498  ReorderBufferCleanupTXN(rb, txn);
2499  MemoryContextSwitchTo(ecxt);
2500  PG_RE_THROW();
2501  }
2502  }
2503  PG_END_TRY();
2504 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
void AbortCurrentTransaction(void)
Definition: xact.c:3210
bool IsToastRelation(Relation relation)
Definition: catalog.c:138
#define relpathperm(rnode, forknum)
Definition: relpath.h:83
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define rbtxn_prepared(txn)
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
bool copied
Definition: snapshot.h:185
int sqlerrcode
Definition: elog.h:376
ErrorData * CopyErrorData(void)
Definition: elog.c:1533
static void ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:87
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4701
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2051
ReorderBufferCommitCB commit
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:636
Form_pg_class rd_rel
Definition: rel.h:110
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
void FlushErrorState(void)
Definition: elog.c:1627
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1589
#define ERROR
Definition: elog.h:45
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
#define RelationIsValid(relation)
Definition: rel.h:430
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:438
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4512
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:455
#define InvalidTransactionId
Definition: transam.h:31
ReorderBufferBeginCB begin_prepare
void RelationClose(Relation relation)
Definition: relcache.c:2123
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
RepOriginId origin_id
Definition: reorderbuffer.h:89
TransactionId CheckXidAlive
Definition: xact.c:95
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
static void ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
void * palloc0(Size size)
Definition: mcxt.c:981
static void ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
#define InvalidCommandId
Definition: c.h:604
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
#define InvalidOid
Definition: postgres_ext.h:36
ReorderBufferStreamStartCB stream_start
struct ReorderBufferChange::@96::@97 tp
#define PG_CATCH()
Definition: elog.h:318
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
void StartTransactionCommand(void)
Definition: xact.c:2838
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4407
struct ReorderBufferChange::@96::@98 truncate
SharedInvalidationMessage * invalidations
#define PG_RE_THROW()
Definition: elog.h:349
struct ReorderBufferChange::@96::@101 inval
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2035
#define elog(elevel,...)
Definition: elog.h:227
int i
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
union ReorderBufferChange::@96 data
static void SetupCheckXidLive(TransactionId xid)
ReorderBufferBeginCB begin
#define PG_TRY()
Definition: elog.h:308
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2017
#define PG_END_TRY()
Definition: elog.h:333
static void ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
ReorderBufferPrepareCB prepare
ReorderBufferStreamStopCB stream_stop

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2951 of file reorderbuffer.c.

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by DecodeHeap2Op(), DecodeHeapOp(), DecodeLogicalMsgOp(), DecodeStandbyOp(), DecodeXactOp(), DecodeXLogOp(), and LogicalDecodingProcessRecord().

2952 {
2953  /* many records won't have an xid assigned, centralize check here */
2954  if (xid != InvalidTransactionId)
2955  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2956 }
#define InvalidTransactionId
Definition: transam.h:31
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change,
bool  toast_insert 
)

Definition at line 752 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, ReorderBufferTXN::concurrent_abort, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBufferChangeMemoryUpdate(), ReorderBufferCheckMemoryLimit(), ReorderBufferProcessPartialChange(), ReorderBufferReturnChange(), ReorderBufferTXNByXid(), ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddInvalidations(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

754 {
755  ReorderBufferTXN *txn;
756 
757  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
758 
759  /*
760  * While streaming the previous changes we have detected that the
761  * transaction is aborted. So there is no point in collecting further
762  * changes for it.
763  */
764  if (txn->concurrent_abort)
765  {
766  /*
767  * We don't need to update memory accounting for this change as we
768  * have not added it to the queue yet.
769  */
770  ReorderBufferReturnChange(rb, change, false);
771  return;
772  }
773 
774  change->lsn = lsn;
775  change->txn = txn;
776 
777  Assert(InvalidXLogRecPtr != lsn);
778  dlist_push_tail(&txn->changes, &change->node);
779  txn->nentries++;
780  txn->nentries_mem++;
781 
782  /* update memory accounting information */
783  ReorderBufferChangeMemoryUpdate(rb, change, true);
784 
785  /* process partial change */
786  ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
787 
788  /* check the memory limits and evict something if needed */
790 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:87
dlist_head changes
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
#define Assert(condition)
Definition: c.h:804
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snapshot,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 797 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), TeardownHistoricSnapshot(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeLogicalMsgOp().

801 {
802  if (transactional)
803  {
804  MemoryContext oldcontext;
805  ReorderBufferChange *change;
806 
808 
809  oldcontext = MemoryContextSwitchTo(rb->context);
810 
811  change = ReorderBufferGetChange(rb);
813  change->data.msg.prefix = pstrdup(prefix);
814  change->data.msg.message_size = message_size;
815  change->data.msg.message = palloc(message_size);
816  memcpy(change->data.msg.message, message, message_size);
817 
818  ReorderBufferQueueChange(rb, xid, lsn, change, false);
819 
820  MemoryContextSwitchTo(oldcontext);
821  }
822  else
823  {
824  ReorderBufferTXN *txn = NULL;
825  volatile Snapshot snapshot_now = snapshot;
826 
827  if (xid != InvalidTransactionId)
828  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
829 
830  /* setup snapshot to allow catalog access */
831  SetupHistoricSnapshot(snapshot_now, NULL);
832  PG_TRY();
833  {
834  rb->message(rb, txn, lsn, false, prefix, message_size, message);
835 
837  }
838  PG_CATCH();
839  {
841  PG_RE_THROW();
842  }
843  PG_END_TRY();
844  }
845 }
char * pstrdup(const char *in)
Definition: mcxt.c:1187
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2051
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
#define InvalidTransactionId
Definition: transam.h:31
ReorderBufferMessageCB message
struct ReorderBufferChange::@96::@99 msg
MemoryContext context
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
#define PG_CATCH()
Definition: elog.h:318
#define Assert(condition)
Definition: c.h:804
#define PG_RE_THROW()
Definition: elog.h:349
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:950
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2035
union ReorderBufferChange::@96 data
#define PG_TRY()
Definition: elog.h:308
#define PG_END_TRY()
Definition: elog.h:333

◆ ReorderBufferRememberPrepareInfo()

bool ReorderBufferRememberPrepareInfo ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  prepare_lsn,
XLogRecPtr  end_lsn,
TimestampTz  prepare_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2600 of file reorderbuffer.c.

References ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodePrepare().

2604 {
2605  ReorderBufferTXN *txn;
2606 
2607  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2608 
2609  /* unknown transaction, nothing to do */
2610  if (txn == NULL)
2611  return false;
2612 
2613  /*
2614  * Remember the prepare information to be later used by commit prepared in
2615  * case we skip doing prepare.
2616  */
2617  txn->final_lsn = prepare_lsn;
2618  txn->end_lsn = end_lsn;
2619  txn->commit_time = prepare_time;
2620  txn->origin_id = origin_id;
2621  txn->origin_lsn = origin_lsn;
2622 
2623  return true;
2624 }
TimestampTz commit_time
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
RepOriginId origin_id
XLogRecPtr origin_lsn
XLogRecPtr final_lsn
XLogRecPtr end_lsn
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferReplay()

static void ReorderBufferReplay ( ReorderBufferTXN txn,
ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)
static

Definition at line 2517 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, FirstCommandId, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, rbtxn_is_streamed, rbtxn_prepared, ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), and ReorderBufferStreamCommit().

Referenced by ReorderBufferCommit(), ReorderBufferFinishPrepared(), and ReorderBufferPrepare().

2522 {
2523  Snapshot snapshot_now;
2524  CommandId command_id = FirstCommandId;
2525 
2526  txn->final_lsn = commit_lsn;
2527  txn->end_lsn = end_lsn;
2528  txn->commit_time = commit_time;
2529  txn->origin_id = origin_id;
2530  txn->origin_lsn = origin_lsn;
2531 
2532  /*
2533  * If the transaction was (partially) streamed, we need to commit it in a
2534  * 'streamed' way. That is, we first stream the remaining part of the
2535  * transaction, and then invoke stream_commit message.
2536  *
2537  * Called after everything (origin ID, LSN, ...) is stored in the
2538  * transaction to avoid passing that information directly.
2539  */
2540  if (rbtxn_is_streamed(txn))
2541  {
2542  ReorderBufferStreamCommit(rb, txn);
2543  return;
2544  }
2545 
2546  /*
2547  * If this transaction has no snapshot, it didn't make any changes to the
2548  * database, so there's nothing to decode. Note that
2549  * ReorderBufferCommitChild will have transferred any snapshots from
2550  * subtransactions if there were any.
2551  */
2552  if (txn->base_snapshot == NULL)
2553  {
2554  Assert(txn->ninvalidations == 0);
2555 
2556  /*
2557  * Removing this txn before a commit might result in the computation
2558  * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
2559  */
2560  if (!rbtxn_prepared(txn))
2561  ReorderBufferCleanupTXN(rb, txn);
2562  return;
2563  }
2564 
2565  snapshot_now = txn->base_snapshot;
2566 
2567  /* Process and send the changes to output plugin. */
2568  ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2569  command_id, false);
2570 }
uint32 CommandId
Definition: c.h:601
TimestampTz commit_time
Snapshot base_snapshot
#define rbtxn_prepared(txn)
RepOriginId origin_id
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
XLogRecPtr origin_lsn
#define FirstCommandId
Definition: c.h:603
#define rbtxn_is_streamed(txn)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn
static void ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)

◆ ReorderBufferResetTXN()

static void ReorderBufferResetTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id,
XLogRecPtr  last_lsn,
ReorderBufferChange specinsert 
)
static

Definition at line 1958 of file reorderbuffer.c.

References rbtxn_is_streamed, rbtxn_prepared, ReorderBufferReturnChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), and ReorderBuffer::stream_stop.

Referenced by ReorderBufferProcessTXN().

1963 {
1964  /* Discard the changes that we just streamed */
1966 
1967  /* Free all resources allocated for toast reconstruction */
1968  ReorderBufferToastReset(rb, txn);
1969 
1970  /* Return the spec insert change if it is not NULL */
1971  if (specinsert != NULL)
1972  {
1973  ReorderBufferReturnChange(rb, specinsert, true);
1974  specinsert = NULL;
1975  }
1976 
1977  /*
1978  * For the streaming case, stop the stream and remember the command ID and
1979  * snapshot for the streaming run.
1980  */
1981  if (rbtxn_is_streamed(txn))
1982  {
1983  rb->stream_stop(rb, txn, last_lsn);
1984  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
1985  }
1986 }
#define rbtxn_prepared(txn)
#define rbtxn_is_streamed(txn)
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
ReorderBufferStreamStopCB stream_stop

◆ ReorderBufferRestoreChange()

static void ReorderBufferRestoreChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
char *  change 
)
static

Definition at line 4125 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, ReorderBufferTXN::changes, ReorderBuffer::context, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::inval, MemoryContextAlloc(), MemoryContextAllocZero(), ReorderBufferChange::msg, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, offsetof, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferGetChange(), ReorderBufferGetRelids(), ReorderBufferGetTupleBuf(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, ReorderBufferChange::snapshot, SnapshotData::subxcnt, ReorderBufferChange::tp, ReorderBufferChange::truncate, SnapshotData::xcnt, and SnapshotData::xip.

Referenced by ReorderBufferRestoreChanges().

4127 {
4128  ReorderBufferDiskChange *ondisk;
4129  ReorderBufferChange *change;
4130 
4131  ondisk = (ReorderBufferDiskChange *) data;
4132 
4133  change = ReorderBufferGetChange(rb);
4134 
4135  /* copy static part */
4136  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
4137 
4138  data += sizeof(ReorderBufferDiskChange);
4139 
4140  /* restore individual stuff */
4141  switch (change->action)
4142  {
4143  /* fall through these, they're all similar enough */
4148  if (change->data.tp.oldtuple)
4149  {
4150  uint32 tuplelen = ((HeapTuple) data)->t_len;
4151 
4152  change->data.tp.oldtuple =
4154 
4155  /* restore ->tuple */
4156  memcpy(&change->data.tp.oldtuple->tuple, data,
4157  sizeof(HeapTupleData));
4158  data += sizeof(HeapTupleData);
4159 
4160  /* reset t_data pointer into the new tuplebuf */
4161  change->data.tp.oldtuple->tuple.t_data =
4162  ReorderBufferTupleBufData(change->data.tp.oldtuple);
4163 
4164  /* restore tuple data itself */
4165  memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
4166  data += tuplelen;
4167  }
4168 
4169  if (change->data.tp.newtuple)
4170  {
4171  /* here, data might not be suitably aligned! */
4172  uint32 tuplelen;
4173 
4174  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
4175  sizeof(uint32));
4176 
4177  change->data.tp.newtuple =
4179 
4180  /* restore ->tuple */
4181  memcpy(&change->data.tp.newtuple->tuple, data,
4182  sizeof(HeapTupleData));
4183  data += sizeof(HeapTupleData);
4184 
4185  /* reset t_data pointer into the new tuplebuf */
4186  change->data.tp.newtuple->tuple.t_data =
4187  ReorderBufferTupleBufData(change->data.tp.newtuple);
4188 
4189  /* restore tuple data itself */
4190  memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
4191  data += tuplelen;
4192  }
4193 
4194  break;
4196  {
4197  Size prefix_size;
4198 
4199  /* read prefix */
4200  memcpy(&prefix_size, data, sizeof(Size));
4201  data += sizeof(Size);
4202  change->data.msg.prefix = MemoryContextAlloc(rb->context,
4203  prefix_size);
4204  memcpy(change->data.msg.prefix, data, prefix_size);
4205  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
4206  data += prefix_size;
4207 
4208  /* read the message */
4209  memcpy(&change->data.msg.message_size, data, sizeof(Size));
4210  data += sizeof(Size);
4211  change->data.msg.message = MemoryContextAlloc(rb->context,
4212  change->data.msg.message_size);
4213  memcpy(change->data.msg.message, data,
4214  change->data.msg.message_size);
4215  data += change->data.msg.message_size;
4216 
4217  break;
4218  }
4220  {
4221  Size inval_size = sizeof(SharedInvalidationMessage) *
4222  change->data.inval.ninvalidations;
4223 
4224  change->data.inval.invalidations =
4225  MemoryContextAlloc(rb->context, inval_size);
4226 
4227  /* read the message */
4228  memcpy(change->data.inval.invalidations, data, inval_size);
4229 
4230  break;
4231  }
4233  {
4234  Snapshot oldsnap;
4235  Snapshot newsnap;
4236  Size size;
4237 
4238  oldsnap = (Snapshot) data;
4239 
4240  size = sizeof(SnapshotData) +
4241  sizeof(TransactionId) * oldsnap->xcnt +
4242  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
4243 
4244  change->data.snapshot = MemoryContextAllocZero(rb->context, size);
4245 
4246  newsnap = change->data.snapshot;
4247 
4248  memcpy(newsnap, data, size);
4249  newsnap->xip = (TransactionId *)
4250  (((char *) newsnap) + sizeof(SnapshotData));
4251  newsnap->subxip = newsnap->xip + newsnap->xcnt;
4252  newsnap->copied = true;
4253  break;
4254  }
4255  /* the base struct contains all the data, easy peasy */
4257  {
4258  Oid *relids;
4259 
4260  relids = ReorderBufferGetRelids(rb,
4261  change->data.truncate.nrelids);
4262  memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
4263  change->data.truncate.relids = relids;
4264 
4265  break;
4266  }
4270  break;
4271  }
4272 
4273  dlist_push_tail(&txn->changes, &change->node);
4274  txn->nentries_mem++;
4275 
4276  /*
4277  * Update memory accounting for the restored change. We need to do this
4278  * although we don't check the memory limit when restoring the changes in
4279  * this branch (we only do that when initially queueing the changes after
4280  * decoding), because we will release the changes later, and that will
4281  * update the accounting too (subtracting the size from the counters). And
4282  * we don't want to underflow there.
4283  */
4284  ReorderBufferChangeMemoryUpdate(rb, change, true);
4285 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
HeapTupleData * HeapTuple
Definition: htup.h:71
uint32 TransactionId
Definition: c.h:587
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
struct SnapshotData * Snapshot
Definition: snapshot.h:121
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:38
dlist_head changes
struct SnapshotData SnapshotData
unsigned int uint32
Definition: c.h:441
TransactionId * xip
Definition: snapshot.h:168
struct ReorderBufferChange::@96::@99 msg
MemoryContext context
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:840
struct ReorderBufferDiskChange ReorderBufferDiskChange
struct ReorderBufferChange::@96::@97 tp
#define Assert(condition)
Definition: c.h:804
struct ReorderBufferChange::@96::@98 truncate
size_t Size
Definition: c.h:540
struct ReorderBufferChange::@96::@101 inval
uint32 xcnt
Definition: snapshot.h:169
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797
Oid * ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
union ReorderBufferChange::@96 data
ReorderBufferChange change
struct HeapTupleData HeapTupleData
#define offsetof(type, field)
Definition: c.h:727
int32 subxcnt
Definition: snapshot.h:181
ReorderBufferTupleBuf * ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)

◆ ReorderBufferRestoreChanges()

static Size ReorderBufferRestoreChanges ( ReorderBuffer rb,
ReorderBufferTXN txn,
TXNEntryFile file,
XLogSegNo segno 
)
static

Definition at line 3984 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, cleanup(), dlist_mutable_iter::cur, TXNEntryFile::curOffset, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FileClose(), FileRead(), ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, max_changes_in_memory, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBuffer::outbuf, PathNameOpenFile(), PG_BINARY, ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferSerializedPath(), ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, TXNEntryFile::vfd, WAIT_EVENT_REORDER_BUFFER_READ, wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferIterTXNInit(), and ReorderBufferIterTXNNext().

3986 {
3987  Size restored = 0;
3988  XLogSegNo last_segno;
3989  dlist_mutable_iter cleanup_iter;
3990  File *fd = &file->vfd;
3991 
3994 
3995  /* free current entries, so we have memory for more */
3996  dlist_foreach_modify(cleanup_iter, &txn->changes)
3997  {
3999  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
4000 
4001  dlist_delete(&cleanup->node);
4002  ReorderBufferReturnChange(rb, cleanup, true);
4003  }
4004  txn->nentries_mem = 0;
4005  Assert(dlist_is_empty(&txn->changes));
4006 
4007  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
4008 
4009  while (restored < max_changes_in_memory && *segno <= last_segno)
4010  {
4011  int readBytes;
4012  ReorderBufferDiskChange *ondisk;
4013 
4014  if (*fd == -1)
4015  {
4016  char path[MAXPGPATH];
4017 
4018  /* first time in */
4019  if (*segno == 0)
4020  XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
4021 
4022  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
4023 
4024  /*
4025  * No need to care about TLIs here, only used during a single run,
4026  * so each LSN only maps to a specific WAL record.
4027  */
4029  *segno);
4030 
4031  *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
4032 
4033  /* No harm in resetting the offset even in case of failure */
4034  file->curOffset = 0;
4035 
4036  if (*fd < 0 && errno == ENOENT)
4037  {
4038  *fd = -1;
4039  (*segno)++;
4040  continue;
4041  }
4042  else if (*fd < 0)
4043  ereport(ERROR,
4045  errmsg("could not open file \"%s\": %m",
4046  path)));
4047  }
4048 
4049  /*
4050  * Read the statically sized part of a change which has information
4051  * about the total size. If we couldn't read a record, we're at the
4052  * end of this file.
4053  */
4055  readBytes = FileRead(file->vfd, rb->outbuf,
4056  sizeof(ReorderBufferDiskChange),
4058 
4059  /* eof */
4060  if (readBytes == 0)
4061  {
4062  FileClose(*fd);
4063  *fd = -1;
4064  (*segno)++;
4065  continue;
4066  }
4067  else if (readBytes < 0)
4068  ereport(ERROR,
4070  errmsg("could not read from reorderbuffer spill file: %m")));
4071  else if (readBytes != sizeof(ReorderBufferDiskChange))
4072  ereport(ERROR,
4074  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4075  readBytes,
4076  (uint32) sizeof(ReorderBufferDiskChange))));
4077 
4078  file->curOffset += readBytes;
4079 
4080  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4081 
4083  sizeof(ReorderBufferDiskChange) + ondisk->size);
4084  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4085 
4086  readBytes = FileRead(file->vfd,
4087  rb->outbuf + sizeof(ReorderBufferDiskChange),
4088  ondisk->size - sizeof(ReorderBufferDiskChange),
4089  file->curOffset,
4091 
4092  if (readBytes < 0)
4093  ereport(ERROR,
4095  errmsg("could not read from reorderbuffer spill file: %m")));
4096  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
4097  ereport(ERROR,
4099  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4100  readBytes,
4101  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
4102 
4103  file->curOffset += readBytes;
4104 
4105  /*
4106  * ok, read a full change from disk, now restore it into proper
4107  * in-memory format
4108  */
4109  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
4110  restored++;
4111  }
4112 
4113  return restored;
4114 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
dlist_node * cur
Definition: ilist.h:180
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1465
int wal_segment_size
Definition: xlog.c:118
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1271
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define ERROR
Definition: elog.h:45
dlist_head changes
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:48
int errcode_for_file_access(void)
Definition: elog.c:717
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
unsigned int uint32
Definition: c.h:441
XLogRecPtr final_lsn
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
static void cleanup(void)
Definition: bootstrap.c:862
TransactionId xid
#define ereport(elevel,...)
Definition: elog.h:155
struct ReorderBufferDiskChange ReorderBufferDiskChange
void FileClose(File file)
Definition: fd.c:1854
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:804
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289