PostgreSQL Source Code  git master
reorderbuffer.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/detoast.h"
#include "access/heapam.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/combocid.h"
#include "utils/memdebug.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenodemap.h"
Include dependency graph for reorderbuffer.c:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTXNByIdEnt
 
struct  ReorderBufferTupleCidKey
 
struct  ReorderBufferTupleCidEnt
 
struct  TXNEntryFile
 
struct  ReorderBufferIterTXNEntry
 
struct  ReorderBufferIterTXNState
 
struct  ReorderBufferToastEnt
 
struct  ReorderBufferDiskChange
 
struct  RewriteMappingFile
 

Macros

#define IsSpecInsert(action)
 
#define IsSpecConfirmOrAbort(action)
 
#define IsInsertOrUpdate(action)
 

Typedefs

typedef struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
 
typedef struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
 
typedef struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
 
typedef struct TXNEntryFile TXNEntryFile
 
typedef struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
 
typedef struct ReorderBufferIterTXNState ReorderBufferIterTXNState
 
typedef struct ReorderBufferToastEnt ReorderBufferToastEnt
 
typedef struct ReorderBufferDiskChange ReorderBufferDiskChange
 
typedef struct RewriteMappingFile RewriteMappingFile
 

Functions

static ReorderBufferTXNReorderBufferGetTXN (ReorderBuffer *rb)
 
static void ReorderBufferReturnTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferTXNReorderBufferTXNByXid (ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
 
static void ReorderBufferTransferSnapToParent (ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
 
static void AssertTXNLsnOrder (ReorderBuffer *rb)
 
static void ReorderBufferIterTXNInit (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
 
static ReorderBufferChangeReorderBufferIterTXNNext (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferIterTXNFinish (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferExecuteInvalidations (uint32 nmsgs, SharedInvalidationMessage *msgs)
 
static void ReorderBufferCheckMemoryLimit (ReorderBuffer *rb)
 
static void ReorderBufferSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeChange (ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
 
static Size ReorderBufferRestoreChanges (ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
 
static void ReorderBufferRestoreChange (ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
 
static void ReorderBufferRestoreCleanup (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferTruncateTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
 
static void ReorderBufferCleanupSerializedTXNs (const char *slotname)
 
static void ReorderBufferSerializedPath (char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
 
static void ReorderBufferFreeSnap (ReorderBuffer *rb, Snapshot snap)
 
static Snapshot ReorderBufferCopySnap (ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
 
static bool ReorderBufferCanStream (ReorderBuffer *rb)
 
static bool ReorderBufferCanStartStreaming (ReorderBuffer *rb)
 
static void ReorderBufferStreamTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferStreamCommit (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastInitHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReset (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReplace (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void ReorderBufferToastAppendChunk (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static Size ReorderBufferChangeSize (ReorderBufferChange *change)
 
static void ReorderBufferChangeMemoryUpdate (ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
 
ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *rb)
 
void ReorderBufferReturnChange (ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
 
ReorderBufferTupleBufReorderBufferGetTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
 
OidReorderBufferGetRelids (ReorderBuffer *rb, int nrelids)
 
void ReorderBufferReturnRelids (ReorderBuffer *rb, Oid *relids)
 
static void ReorderBufferProcessPartialChange (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
static void AssertChangeLsnOrder (ReorderBufferTXN *txn)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
TransactionId ReorderBufferGetOldestXmin (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
static int ReorderBufferIterCompare (Datum a, Datum b, void *arg)
 
static void ReorderBufferCleanupTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferBuildTupleCidHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void SetupCheckXidLive (TransactionId xid)
 
static void ReorderBufferApplyChange (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyTruncate (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyMessage (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferSaveTXNSnapshot (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
 
static void ReorderBufferResetTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
 
static void ReorderBufferProcessTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)
 
static void ReorderBufferReplay (ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
bool ReorderBufferRememberPrepareInfo (ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferSkipPrepare (ReorderBuffer *rb, TransactionId xid)
 
void ReorderBufferPrepare (ReorderBuffer *rb, TransactionId xid, char *gid)
 
void ReorderBufferFinishPrepared (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferInvalidate (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
static void ReorderBufferSerializeReserve (ReorderBuffer *rb, Size sz)
 
static ReorderBufferTXNReorderBufferLargestTXN (ReorderBuffer *rb)
 
static ReorderBufferTXNReorderBufferLargestTopTXN (ReorderBuffer *rb)
 
void StartupReorderBuffer (void)
 
static void ApplyLogicalMappingFile (HTAB *tuplecid_data, Oid relid, const char *fname)
 
static bool TransactionIdInArray (TransactionId xid, TransactionId *xip, Size num)
 
static int file_sort_by_lsn (const ListCell *a_p, const ListCell *b_p)
 
static void UpdateLogicalMappings (HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
 
bool ResolveCminCmaxDuringDecoding (HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
 

Variables

int logical_decoding_work_mem
 
static const Size max_changes_in_memory = 4096
 

Macro Definition Documentation

◆ IsInsertOrUpdate

◆ IsSpecConfirmOrAbort

◆ IsSpecInsert

Typedef Documentation

◆ ReorderBufferDiskChange

◆ ReorderBufferIterTXNEntry

◆ ReorderBufferIterTXNState

◆ ReorderBufferToastEnt

◆ ReorderBufferTupleCidEnt

◆ ReorderBufferTupleCidKey

◆ ReorderBufferTXNByIdEnt

◆ RewriteMappingFile

◆ TXNEntryFile

typedef struct TXNEntryFile TXNEntryFile

Function Documentation

◆ ApplyLogicalMappingFile()

static void ApplyLogicalMappingFile ( HTAB tuplecid_data,
Oid  relid,
const char *  fname 
)
static

Definition at line 4876 of file reorderbuffer.c.

References Assert, CloseTransientFile(), ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), HASH_ENTER, HASH_FIND, hash_search(), InvalidCommandId, ItemPointerCopy, sort-test::key, MAXPGPATH, LogicalRewriteMappingData::new_node, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_node, LogicalRewriteMappingData::old_tid, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, ReorderBufferTupleCidKey::relnode, sprintf, ReorderBufferTupleCidKey::tid, and WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ.

Referenced by UpdateLogicalMappings().

4877 {
4878  char path[MAXPGPATH];
4879  int fd;
4880  int readBytes;
4882 
4883  sprintf(path, "pg_logical/mappings/%s", fname);
4884  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
4885  if (fd < 0)
4886  ereport(ERROR,
4888  errmsg("could not open file \"%s\": %m", path)));
4889 
4890  while (true)
4891  {
4894  ReorderBufferTupleCidEnt *new_ent;
4895  bool found;
4896 
4897  /* be careful about padding */
4898  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
4899 
4900  /* read all mappings till the end of the file */
4902  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
4904 
4905  if (readBytes < 0)
4906  ereport(ERROR,
4908  errmsg("could not read file \"%s\": %m",
4909  path)));
4910  else if (readBytes == 0) /* EOF */
4911  break;
4912  else if (readBytes != sizeof(LogicalRewriteMappingData))
4913  ereport(ERROR,
4915  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
4916  path, readBytes,
4917  (int32) sizeof(LogicalRewriteMappingData))));
4918 
4919  key.relnode = map.old_node;
4920  ItemPointerCopy(&map.old_tid,
4921  &key.tid);
4922 
4923 
4924  ent = (ReorderBufferTupleCidEnt *)
4925  hash_search(tuplecid_data,
4926  (void *) &key,
4927  HASH_FIND,
4928  NULL);
4929 
4930  /* no existing mapping, no need to update */
4931  if (!ent)
4932  continue;
4933 
4934  key.relnode = map.new_node;
4935  ItemPointerCopy(&map.new_tid,
4936  &key.tid);
4937 
4938  new_ent = (ReorderBufferTupleCidEnt *)
4939  hash_search(tuplecid_data,
4940  (void *) &key,
4941  HASH_ENTER,
4942  &found);
4943 
4944  if (found)
4945  {
4946  /*
4947  * Make sure the existing mapping makes sense. We sometime update
4948  * old records that did not yet have a cmax (e.g. pg_class' own
4949  * entry while rewriting it) during rewrites, so allow that.
4950  */
4951  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
4952  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
4953  }
4954  else
4955  {
4956  /* update mapping */
4957  new_ent->cmin = ent->cmin;
4958  new_ent->cmax = ent->cmax;
4959  new_ent->combocid = ent->combocid;
4960  }
4961  }
4962 
4963  if (CloseTransientFile(fd) != 0)
4964  ereport(ERROR,
4966  errmsg("could not close file \"%s\": %m", path)));
4967 }
static void pgstat_report_wait_end(void)
Definition: wait_event.h:278
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1271
signed int int32
Definition: c.h:429
#define sprintf
Definition: port.h:218
#define ERROR
Definition: elog.h:46
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2467
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:721
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:262
int CloseTransientFile(int fd)
Definition: fd.c:2644
#define InvalidCommandId
Definition: c.h:604
#define ereport(elevel,...)
Definition: elog.h:157
ItemPointerData new_tid
Definition: rewriteheap.h:40
#define Assert(condition)
Definition: c.h:804
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define read(a, b, c)
Definition: win32.h:13
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:161
ItemPointerData old_tid
Definition: rewriteheap.h:39

◆ AssertChangeLsnOrder()

static void AssertChangeLsnOrder ( ReorderBufferTXN txn)
static

Definition at line 925 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, and ReorderBufferChange::lsn.

Referenced by ReorderBufferIterTXNInit().

926 {
927 #ifdef USE_ASSERT_CHECKING
928  dlist_iter iter;
929  XLogRecPtr prev_lsn = txn->first_lsn;
930 
931  dlist_foreach(iter, &txn->changes)
932  {
933  ReorderBufferChange *cur_change;
934 
935  cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
936 
938  Assert(cur_change->lsn != InvalidXLogRecPtr);
939  Assert(txn->first_lsn <= cur_change->lsn);
940 
941  if (txn->end_lsn != InvalidXLogRecPtr)
942  Assert(cur_change->lsn <= txn->end_lsn);
943 
944  Assert(prev_lsn <= cur_change->lsn);
945 
946  prev_lsn = cur_change->lsn;
947  }
948 #endif
949 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_foreach(iter, lhead)
Definition: ilist.h:526
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
dlist_head changes
dlist_node * cur
Definition: ilist.h:161
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn

◆ AssertTXNLsnOrder()

static void AssertTXNLsnOrder ( ReorderBuffer rb)
static

Definition at line 868 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferAssignChild(), ReorderBufferGetOldestTXN(), ReorderBufferGetOldestXmin(), ReorderBufferSetBaseSnapshot(), and ReorderBufferTXNByXid().

869 {
870 #ifdef USE_ASSERT_CHECKING
871  dlist_iter iter;
872  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
873  XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
874 
875  dlist_foreach(iter, &rb->toplevel_by_lsn)
876  {
878  iter.cur);
879 
880  /* start LSN must be set */
881  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
882 
883  /* If there is an end LSN, it must be higher than start LSN */
884  if (cur_txn->end_lsn != InvalidXLogRecPtr)
885  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
886 
887  /* Current initial LSN must be strictly higher than previous */
888  if (prev_first_lsn != InvalidXLogRecPtr)
889  Assert(prev_first_lsn < cur_txn->first_lsn);
890 
891  /* known-as-subtxn txns must not be listed */
892  Assert(!rbtxn_is_known_subxact(cur_txn));
893 
894  prev_first_lsn = cur_txn->first_lsn;
895  }
896 
898  {
900  base_snapshot_node,
901  iter.cur);
902 
903  /* base snapshot (and its LSN) must be set */
904  Assert(cur_txn->base_snapshot != NULL);
906 
907  /* current LSN must be strictly higher than previous */
908  if (prev_base_snap_lsn != InvalidXLogRecPtr)
909  Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
910 
911  /* known-as-subtxn txns must not be listed */
912  Assert(!rbtxn_is_known_subxact(cur_txn));
913 
914  prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
915  }
916 #endif
917 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
#define dlist_foreach(iter, lhead)
Definition: ilist.h:526
XLogRecPtr base_snapshot_lsn
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
dlist_head txns_by_base_snapshot_lsn
dlist_head toplevel_by_lsn
dlist_node * cur
Definition: ilist.h:161
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn
#define rbtxn_is_known_subxact(txn)

◆ file_sort_by_lsn()

static int file_sort_by_lsn ( const ListCell a_p,
const ListCell b_p 
)
static

Definition at line 4984 of file reorderbuffer.c.

References lfirst, and RewriteMappingFile::lsn.

Referenced by UpdateLogicalMappings().

4985 {
4988 
4989  if (a->lsn < b->lsn)
4990  return -1;
4991  else if (a->lsn > b->lsn)
4992  return 1;
4993  return 0;
4994 }
#define lfirst(lc)
Definition: pg_list.h:169

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2820 of file reorderbuffer.c.

References ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), ReorderBuffer::stream_abort, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeAbort().

2821 {
2822  ReorderBufferTXN *txn;
2823 
2824  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2825  false);
2826 
2827  /* unknown, nothing to remove */
2828  if (txn == NULL)
2829  return;
2830 
2831  /* For streamed transactions notify the remote node about the abort. */
2832  if (rbtxn_is_streamed(txn))
2833  {
2834  rb->stream_abort(rb, txn, lsn);
2835 
2836  /*
2837  * We might have decoded changes for this transaction that could load
2838  * the cache as per the current transaction's view (consider DDL's
2839  * happened in this transaction). We don't want the decoding of future
2840  * transactions to use those cache entries so execute invalidations.
2841  */
2842  if (txn->ninvalidations > 0)
2844  txn->invalidations);
2845  }
2846 
2847  /* cosmetic... */
2848  txn->final_lsn = lsn;
2849 
2850  /* remove potential on-disk data, and deallocate */
2851  ReorderBufferCleanupTXN(rb, txn);
2852 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
ReorderBufferStreamAbortCB stream_abort
#define rbtxn_is_streamed(txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)

Definition at line 2862 of file reorderbuffer.c.

References dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, elog, ReorderBufferCleanupTXN(), ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::xid.

Referenced by DecodeStandbyOp().

2863 {
2864  dlist_mutable_iter it;
2865 
2866  /*
2867  * Iterate through all (potential) toplevel TXNs and abort all that are
2868  * older than what possibly can be running. Once we've found the first
2869  * that is alive we stop, there might be some that acquired an xid earlier
2870  * but started writing later, but it's unlikely and they will be cleaned
2871  * up in a later call to this function.
2872  */
2874  {
2875  ReorderBufferTXN *txn;
2876 
2877  txn = dlist_container(ReorderBufferTXN, node, it.cur);
2878 
2879  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2880  {
2881  elog(DEBUG2, "aborting old transaction %u", txn->xid);
2882 
2883  /* remove potential on-disk data, and deallocate this tx */
2884  ReorderBufferCleanupTXN(rb, txn);
2885  }
2886  else
2887  return;
2888  }
2889 }
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:543
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
#define DEBUG2
Definition: elog.h:24
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
dlist_head toplevel_by_lsn
TransactionId xid
#define elog(elevel,...)
Definition: elog.h:232

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 3197 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferTXN::invalidations, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, palloc(), REORDER_BUFFER_CHANGE_INVALIDATION, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), repalloc(), ReorderBufferTXN::toptxn, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeXactOp().

3200 {
3201  ReorderBufferTXN *txn;
3202  MemoryContext oldcontext;
3203  ReorderBufferChange *change;
3204 
3205  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3206 
3207  oldcontext = MemoryContextSwitchTo(rb->context);
3208 
3209  /*
3210  * Collect all the invalidations under the top transaction so that we can
3211  * execute them all together. See comment atop this function
3212  */
3213  if (txn->toptxn)
3214  txn = txn->toptxn;
3215 
3216  Assert(nmsgs > 0);
3217 
3218  /* Accumulate invalidations. */
3219  if (txn->ninvalidations == 0)
3220  {
3221  txn->ninvalidations = nmsgs;
3223  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3224  memcpy(txn->invalidations, msgs,
3225  sizeof(SharedInvalidationMessage) * nmsgs);
3226  }
3227  else
3228  {
3231  (txn->ninvalidations + nmsgs));
3232 
3233  memcpy(txn->invalidations + txn->ninvalidations, msgs,
3234  nmsgs * sizeof(SharedInvalidationMessage));
3235  txn->ninvalidations += nmsgs;
3236  }
3237 
3238  change = ReorderBufferGetChange(rb);
3240  change->data.inval.ninvalidations = nmsgs;
3241  change->data.inval.invalidations = (SharedInvalidationMessage *)
3242  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3243  memcpy(change->data.inval.invalidations, msgs,
3244  sizeof(SharedInvalidationMessage) * nmsgs);
3245 
3246  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3247 
3248  MemoryContextSwitchTo(oldcontext);
3249 }
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:85
struct ReorderBufferTXN * toptxn
struct ReorderBufferChange::@97::@102 inval
union ReorderBufferChange::@97 data
MemoryContext context
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
#define Assert(condition)
Definition: c.h:804
SharedInvalidationMessage * invalidations
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1182
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:1062

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

Definition at line 3076 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::command_id, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, ReorderBufferGetChange(), and ReorderBufferQueueChange().

Referenced by SnapBuildProcessNewCid().

3078 {
3080 
3081  change->data.command_id = cid;
3083 
3084  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3085 }
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:85
union ReorderBufferChange::@97 data
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileNode  node,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 3161 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessNewCid().

3165 {
3167  ReorderBufferTXN *txn;
3168 
3169  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3170 
3171  change->data.tuplecid.node = node;
3172  change->data.tuplecid.tid = tid;
3173  change->data.tuplecid.cmin = cmin;
3174  change->data.tuplecid.cmax = cmax;
3175  change->data.tuplecid.combocid = combocid;
3176  change->lsn = lsn;
3177  change->txn = txn;
3179 
3180  dlist_push_tail(&txn->tuplecids, &change->node);
3181  txn->ntuplecids++;
3182 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:88
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:85
union ReorderBufferChange::@97 data
struct ReorderBufferChange::@97::@101 tuplecid
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
dlist_head tuplecids

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 3027 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, ReorderBufferGetChange(), ReorderBufferQueueChange(), and ReorderBufferChange::snapshot.

Referenced by SnapBuildDistributeNewCatalogSnapshot().

3029 {
3031 
3032  change->data.snapshot = snap;
3034 
3035  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3036 }
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:85
union ReorderBufferChange::@97 data
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)

◆ ReorderBufferAllocate()

ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 300 of file reorderbuffer.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, ReorderBufferCleanupSerializedTXNs(), ReorderBuffer::size, SLAB_DEFAULT_BLOCK_SIZE, SLAB_LARGE_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBuffer::toplevel_by_lsn, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBuffer::tup_context, ReorderBuffer::txn_context, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

301 {
302  ReorderBuffer *buffer;
303  HASHCTL hash_ctl;
304  MemoryContext new_ctx;
305 
306  Assert(MyReplicationSlot != NULL);
307 
308  /* allocate memory in own context, to have better accountability */
310  "ReorderBuffer",
312 
313  buffer =
314  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
315 
316  memset(&hash_ctl, 0, sizeof(hash_ctl));
317 
318  buffer->context = new_ctx;
319 
320  buffer->change_context = SlabContextCreate(new_ctx,
321  "Change",
323  sizeof(ReorderBufferChange));
324 
325  buffer->txn_context = SlabContextCreate(new_ctx,
326  "TXN",
328  sizeof(ReorderBufferTXN));
329 
330  buffer->tup_context = GenerationContextCreate(new_ctx,
331  "Tuples",
333 
334  hash_ctl.keysize = sizeof(TransactionId);
335  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
336  hash_ctl.hcxt = buffer->context;
337 
338  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
340 
342  buffer->by_txn_last_txn = NULL;
343 
344  buffer->outbuf = NULL;
345  buffer->outbufsize = 0;
346  buffer->size = 0;
347 
348  buffer->spillTxns = 0;
349  buffer->spillCount = 0;
350  buffer->spillBytes = 0;
351  buffer->streamTxns = 0;
352  buffer->streamCount = 0;
353  buffer->streamBytes = 0;
354  buffer->totalTxns = 0;
355  buffer->totalBytes = 0;
356 
358 
359  dlist_init(&buffer->toplevel_by_lsn);
361 
362  /*
363  * Ensure there's no stale data from prior uses of this slot, in case some
364  * prior exit avoided calling ReorderBufferFree. Failure to do this can
365  * produce duplicated txns, and it's very cheap if there's nothing there.
366  */
368 
369  return buffer;
370 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define AllocSetContextCreate
Definition: memutils.h:173
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
uint32 TransactionId
Definition: c.h:587
MemoryContext hcxt
Definition: hsearch.h:86
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
Size entrysize
Definition: hsearch.h:76
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:175
ReplicationSlotPersistentData data
Definition: slot.h:147
MemoryContext change_context
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:225
dlist_head txns_by_base_snapshot_lsn
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:195
#define InvalidTransactionId
Definition: transam.h:31
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
#define HASH_BLOBS
Definition: hsearch.h:97
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size blockSize)
Definition: generation.c:197
MemoryContext context
ReorderBufferTXN * by_txn_last_txn
dlist_head toplevel_by_lsn
Size keysize
Definition: hsearch.h:75
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:804
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:224
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863
#define NameStr(name)
Definition: c.h:681
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
MemoryContext tup_context
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext txn_context

◆ ReorderBufferApplyChange()

static void ReorderBufferApplyChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1907 of file reorderbuffer.c.

References ReorderBuffer::apply_change, and ReorderBuffer::stream_change.

Referenced by ReorderBufferProcessTXN().

1910 {
1911  if (streaming)
1912  rb->stream_change(rb, txn, relation, change);
1913  else
1914  rb->apply_change(rb, txn, relation, change);
1915 }
ReorderBufferApplyChangeCB apply_change
ReorderBufferStreamChangeCB stream_change

◆ ReorderBufferApplyMessage()

static void ReorderBufferApplyMessage ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1935 of file reorderbuffer.c.

References ReorderBufferChange::data, ReorderBufferChange::lsn, ReorderBuffer::message, ReorderBufferChange::msg, and ReorderBuffer::stream_message.

Referenced by ReorderBufferProcessTXN().

1937 {
1938  if (streaming)
1939  rb->stream_message(rb, txn, change->lsn, true,
1940  change->data.msg.prefix,
1941  change->data.msg.message_size,
1942  change->data.msg.message);
1943  else
1944  rb->message(rb, txn, change->lsn, true,
1945  change->data.msg.prefix,
1946  change->data.msg.message_size,
1947  change->data.msg.message);
1948 }
ReorderBufferStreamMessageCB stream_message
ReorderBufferMessageCB message
union ReorderBufferChange::@97 data
struct ReorderBufferChange::@97::@100 msg

◆ ReorderBufferApplyTruncate()

static void ReorderBufferApplyTruncate ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  nrelations,
Relation relations,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1921 of file reorderbuffer.c.

References ReorderBuffer::apply_truncate, and ReorderBuffer::stream_truncate.

Referenced by ReorderBufferProcessTXN().

1924 {
1925  if (streaming)
1926  rb->stream_truncate(rb, txn, nrelations, relations, change);
1927  else
1928  rb->apply_truncate(rb, txn, nrelations, relations, change);
1929 }
ReorderBufferStreamTruncateCB stream_truncate
ReorderBufferApplyTruncateCB apply_truncate

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 1011 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SUBXACT, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, ReorderBufferTXN::toplevel_xid, ReorderBufferTXN::toptxn, ReorderBufferTXNByIdEnt::txn, ReorderBufferTXN::txn_flags, and ReorderBufferTXNByIdEnt::xid.

Referenced by LogicalDecodingProcessRecord(), and ReorderBufferCommitChild().

1013 {
1014  ReorderBufferTXN *txn;
1015  ReorderBufferTXN *subtxn;
1016  bool new_top;
1017  bool new_sub;
1018 
1019  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1020  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1021 
1022  if (!new_sub)
1023  {
1024  if (rbtxn_is_known_subxact(subtxn))
1025  {
1026  /* already associated, nothing to do */
1027  return;
1028  }
1029  else
1030  {
1031  /*
1032  * We already saw this transaction, but initially added it to the
1033  * list of top-level txns. Now that we know it's not top-level,
1034  * remove it from there.
1035  */
1036  dlist_delete(&subtxn->node);
1037  }
1038  }
1039 
1040  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1041  subtxn->toplevel_xid = xid;
1042  Assert(subtxn->nsubtxns == 0);
1043 
1044  /* set the reference to top-level transaction */
1045  subtxn->toptxn = txn;
1046 
1047  /* add to subtransaction list */
1048  dlist_push_tail(&txn->subtxns, &subtxn->node);
1049  txn->nsubtxns++;
1050 
1051  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1052  ReorderBufferTransferSnapToParent(txn, subtxn);
1053 
1054  /* Verify LSN-ordering invariant */
1055  AssertTXNLsnOrder(rb);
1056 }
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
#define RBTXN_IS_SUBXACT
struct ReorderBufferTXN * toptxn
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:804
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_known_subxact(txn)
TransactionId toplevel_xid

◆ ReorderBufferBuildTupleCidHash()

static void ReorderBufferBuildTupleCidHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1670 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, dlist_container, dlist_foreach, dlist_is_empty(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), HASHCTL::hcxt, InvalidCommandId, ItemPointerCopy, sort-test::key, HASHCTL::keysize, ReorderBufferTXN::ntuplecids, rbtxn_has_catalog_changes, ReorderBufferTupleCidKey::relnode, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferTupleCidKey::tid, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::tuplecids.

Referenced by ReorderBufferProcessTXN().

1671 {
1672  dlist_iter iter;
1673  HASHCTL hash_ctl;
1674 
1676  return;
1677 
1678  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1679  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1680  hash_ctl.hcxt = rb->context;
1681 
1682  /*
1683  * create the hash with the exact number of to-be-stored tuplecids from
1684  * the start
1685  */
1686  txn->tuplecid_hash =
1687  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1689 
1690  dlist_foreach(iter, &txn->tuplecids)
1691  {
1694  bool found;
1695  ReorderBufferChange *change;
1696 
1697  change = dlist_container(ReorderBufferChange, node, iter.cur);
1698 
1700 
1701  /* be careful about padding */
1702  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1703 
1704  key.relnode = change->data.tuplecid.node;
1705 
1706  ItemPointerCopy(&change->data.tuplecid.tid,
1707  &key.tid);
1708 
1709  ent = (ReorderBufferTupleCidEnt *)
1711  (void *) &key,
1712  HASH_ENTER,
1713  &found);
1714  if (!found)
1715  {
1716  ent->cmin = change->data.tuplecid.cmin;
1717  ent->cmax = change->data.tuplecid.cmax;
1718  ent->combocid = change->data.tuplecid.combocid;
1719  }
1720  else
1721  {
1722  /*
1723  * Maybe we already saw this tuple before in this transaction, but
1724  * if so it must have the same cmin.
1725  */
1726  Assert(ent->cmin == change->data.tuplecid.cmin);
1727 
1728  /*
1729  * cmax may be initially invalid, but once set it can only grow,
1730  * and never become invalid again.
1731  */
1732  Assert((ent->cmax == InvalidCommandId) ||
1733  ((change->data.tuplecid.cmax != InvalidCommandId) &&
1734  (change->data.tuplecid.cmax > ent->cmax)));
1735  ent->cmax = change->data.tuplecid.cmax;
1736  }
1737  }
1738 }
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
MemoryContext hcxt
Definition: hsearch.h:86
#define dlist_foreach(iter, lhead)
Definition: ilist.h:526
Size entrysize
Definition: hsearch.h:76
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:85
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
union ReorderBufferChange::@97 data
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
#define HASH_BLOBS
Definition: hsearch.h:97
MemoryContext context
#define InvalidCommandId
Definition: c.h:604
Size keysize
Definition: hsearch.h:75
dlist_node * cur
Definition: ilist.h:161
#define Assert(condition)
Definition: c.h:804
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
struct ReorderBufferChange::@97::@101 tuplecid
#define rbtxn_has_catalog_changes(txn)
dlist_head tuplecids
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:161

◆ ReorderBufferCanStartStreaming()

static bool ReorderBufferCanStartStreaming ( ReorderBuffer rb)
inlinestatic

Definition at line 3833 of file reorderbuffer.c.

References XLogReaderState::EndRecPtr, ReorderBuffer::private_data, LogicalDecodingContext::reader, ReorderBufferCanStream(), SNAPBUILD_CONSISTENT, SnapBuildCurrentState(), SnapBuildXactNeedsSkip(), and LogicalDecodingContext::snapshot_builder.

Referenced by ReorderBufferCheckMemoryLimit(), and ReorderBufferProcessPartialChange().

3834 {
3836  SnapBuild *builder = ctx->snapshot_builder;
3837 
3838  /* We can't start streaming unless a consistent state is reached. */
3840  return false;
3841 
3842  /*
3843  * We can't start streaming immediately even if the streaming is enabled
3844  * because we previously decoded this transaction and now just are
3845  * restarting.
3846  */
3847  if (ReorderBufferCanStream(rb) &&
3848  !SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
3849  return true;
3850 
3851  return false;
3852 }
void * private_data
bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
Definition: snapbuild.c:394
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
Definition: snapbuild.c:367
XLogRecPtr EndRecPtr
Definition: xlogreader.h:176
static bool ReorderBufferCanStream(ReorderBuffer *rb)
struct SnapBuild * snapshot_builder
Definition: logical.h:43
XLogReaderState * reader
Definition: logical.h:41

◆ ReorderBufferCanStream()

static bool ReorderBufferCanStream ( ReorderBuffer rb)
inlinestatic

◆ ReorderBufferChangeMemoryUpdate()

static void ReorderBufferChangeMemoryUpdate ( ReorderBuffer rb,
ReorderBufferChange change,
bool  addition 
)
static

Definition at line 3101 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferChangeSize(), ReorderBufferTXN::size, ReorderBuffer::size, ReorderBufferTXN::toptxn, ReorderBufferTXN::total_size, ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferReturnChange(), and ReorderBufferToastReplace().

3104 {
3105  Size sz;
3106  ReorderBufferTXN *txn;
3107  ReorderBufferTXN *toptxn;
3108 
3109  Assert(change->txn);
3110 
3111  /*
3112  * Ignore tuple CID changes, because those are not evicted when reaching
3113  * memory limit. So we just don't count them, because it might easily
3114  * trigger a pointless attempt to spill.
3115  */
3117  return;
3118 
3119  txn = change->txn;
3120 
3121  /*
3122  * Update the total size in top level as well. This is later used to
3123  * compute the decoding stats.
3124  */
3125  if (txn->toptxn != NULL)
3126  toptxn = txn->toptxn;
3127  else
3128  toptxn = txn;
3129 
3130  sz = ReorderBufferChangeSize(change);
3131 
3132  if (addition)
3133  {
3134  txn->size += sz;
3135  rb->size += sz;
3136 
3137  /* Update the total size in the top transaction. */
3138  toptxn->total_size += sz;
3139  }
3140  else
3141  {
3142  Assert((rb->size >= sz) && (txn->size >= sz));
3143  txn->size -= sz;
3144  rb->size -= sz;
3145 
3146  /* Update the total size in the top transaction. */
3147  toptxn->total_size -= sz;
3148  }
3149 
3150  Assert(txn->size <= rb->size);
3151 }
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:88
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:85
struct ReorderBufferTXN * toptxn
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
#define Assert(condition)
Definition: c.h:804
size_t Size
Definition: c.h:540

◆ ReorderBufferChangeSize()

static Size ReorderBufferChangeSize ( ReorderBufferChange change)
static

Definition at line 3976 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::msg, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::snapshot, SnapshotData::subxcnt, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTupleBuf::tuple, and SnapshotData::xcnt.

Referenced by ReorderBufferChangeMemoryUpdate().

3977 {
3978  Size sz = sizeof(ReorderBufferChange);
3979 
3980  switch (change->action)
3981  {
3982  /* fall through these, they're all similar enough */
3987  {
3988  ReorderBufferTupleBuf *oldtup,
3989  *newtup;
3990  Size oldlen = 0;
3991  Size newlen = 0;
3992 
3993  oldtup = change->data.tp.oldtuple;
3994  newtup = change->data.tp.newtuple;
3995 
3996  if (oldtup)
3997  {
3998  sz += sizeof(HeapTupleData);
3999  oldlen = oldtup->tuple.t_len;
4000  sz += oldlen;
4001  }
4002 
4003  if (newtup)
4004  {
4005  sz += sizeof(HeapTupleData);
4006  newlen = newtup->tuple.t_len;
4007  sz += newlen;
4008  }
4009 
4010  break;
4011  }
4013  {
4014  Size prefix_size = strlen(change->data.msg.prefix) + 1;
4015 
4016  sz += prefix_size + change->data.msg.message_size +
4017  sizeof(Size) + sizeof(Size);
4018 
4019  break;
4020  }
4022  {
4023  sz += sizeof(SharedInvalidationMessage) *
4024  change->data.inval.ninvalidations;
4025  break;
4026  }
4028  {
4029  Snapshot snap;
4030 
4031  snap = change->data.snapshot;
4032 
4033  sz += sizeof(SnapshotData) +
4034  sizeof(TransactionId) * snap->xcnt +
4035  sizeof(TransactionId) * snap->subxcnt;
4036 
4037  break;
4038  }
4040  {
4041  sz += sizeof(Oid) * change->data.truncate.nrelids;
4042 
4043  break;
4044  }
4049  /* ReorderBufferChange contains everything important */
4050  break;
4051  }
4052 
4053  return sz;
4054 }
uint32 TransactionId
Definition: c.h:587
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:85
uint32 t_len
Definition: htup.h:64
HeapTupleData tuple
Definition: reorderbuffer.h:29
struct SnapshotData SnapshotData
struct ReorderBufferChange::@97::@102 inval
union ReorderBufferChange::@97 data
struct ReorderBufferChange::@97::@100 msg
size_t Size
Definition: c.h:540
struct ReorderBufferChange::@97::@98 tp
struct ReorderBufferChange::@97::@99 truncate
uint32 xcnt
Definition: snapshot.h:169
struct HeapTupleData HeapTupleData
struct ReorderBufferChange ReorderBufferChange
int32 subxcnt
Definition: snapshot.h:181

◆ ReorderBufferCheckMemoryLimit()

static void ReorderBufferCheckMemoryLimit ( ReorderBuffer rb)
static

Definition at line 3452 of file reorderbuffer.c.

References Assert, logical_decoding_work_mem, ReorderBufferTXN::nentries_mem, ReorderBufferCanStartStreaming(), ReorderBufferLargestTopTXN(), ReorderBufferLargestTXN(), ReorderBufferSerializeTXN(), ReorderBufferStreamTXN(), ReorderBufferTXN::size, ReorderBuffer::size, ReorderBufferTXN::toptxn, ReorderBufferTXN::total_size, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferQueueChange().

3453 {
3454  ReorderBufferTXN *txn;
3455 
3456  /* bail out if we haven't exceeded the memory limit */
3457  if (rb->size < logical_decoding_work_mem * 1024L)
3458  return;
3459 
3460  /*
3461  * Loop until we reach under the memory limit. One might think that just
3462  * by evicting the largest (sub)transaction we will come under the memory
3463  * limit based on assumption that the selected transaction is at least as
3464  * large as the most recent change (which caused us to go over the memory
3465  * limit). However, that is not true because a user can reduce the
3466  * logical_decoding_work_mem to a smaller value before the most recent
3467  * change.
3468  */
3469  while (rb->size >= logical_decoding_work_mem * 1024L)
3470  {
3471  /*
3472  * Pick the largest transaction (or subtransaction) and evict it from
3473  * memory by streaming, if possible. Otherwise, spill to disk.
3474  */
3476  (txn = ReorderBufferLargestTopTXN(rb)) != NULL)
3477  {
3478  /* we know there has to be one, because the size is not zero */
3479  Assert(txn && !txn->toptxn);
3480  Assert(txn->total_size > 0);
3481  Assert(rb->size >= txn->total_size);
3482 
3483  ReorderBufferStreamTXN(rb, txn);
3484  }
3485  else
3486  {
3487  /*
3488  * Pick the largest transaction (or subtransaction) and evict it
3489  * from memory by serializing it to disk.
3490  */
3491  txn = ReorderBufferLargestTXN(rb);
3492 
3493  /* we know there has to be one, because the size is not zero */
3494  Assert(txn);
3495  Assert(txn->size > 0);
3496  Assert(rb->size >= txn->size);
3497 
3498  ReorderBufferSerializeTXN(rb, txn);
3499  }
3500 
3501  /*
3502  * After eviction, the transaction should have no entries in memory,
3503  * and should use 0 bytes for changes.
3504  */
3505  Assert(txn->size == 0);
3506  Assert(txn->nentries_mem == 0);
3507  }
3508 
3509  /* We must be under the memory limit now. */
3510  Assert(rb->size < logical_decoding_work_mem * 1024L);
3511 }
static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferLargestTopTXN(ReorderBuffer *rb)
struct ReorderBufferTXN * toptxn
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:804
int logical_decoding_work_mem
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)

◆ ReorderBufferCleanupSerializedTXNs()

static void ReorderBufferCleanupSerializedTXNs ( const char *  slotname)
static

Definition at line 4399 of file reorderbuffer.c.

References AllocateDir(), dirent::d_name, ereport, errcode_for_file_access(), errmsg(), ERROR, FreeDir(), INFO, lstat, MAXPGPATH, ReadDirExtended(), S_ISDIR, snprintf, sprintf, and stat::st_mode.

Referenced by ReorderBufferAllocate(), ReorderBufferFree(), and StartupReorderBuffer().

4400 {
4401  DIR *spill_dir;
4402  struct dirent *spill_de;
4403  struct stat statbuf;
4404  char path[MAXPGPATH * 2 + 12];
4405 
4406  sprintf(path, "pg_replslot/%s", slotname);
4407 
4408  /* we're only handling directories here, skip if it's not ours */
4409  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
4410  return;
4411 
4412  spill_dir = AllocateDir(path);
4413  while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
4414  {
4415  /* only look at names that can be ours */
4416  if (strncmp(spill_de->d_name, "xid", 3) == 0)
4417  {
4418  snprintf(path, sizeof(path),
4419  "pg_replslot/%s/%s", slotname,
4420  spill_de->d_name);
4421 
4422  if (unlink(path) != 0)
4423  ereport(ERROR,
4425  errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
4426  path, slotname)));
4427  }
4428  }
4429  FreeDir(spill_dir);
4430 }
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2759
#define INFO
Definition: elog.h:33
Definition: dirent.h:9
#define sprintf
Definition: port.h:218
Definition: dirent.c:25
#define ERROR
Definition: elog.h:46
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:721
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2678
#define ereport(elevel,...)
Definition: elog.h:157
#define S_ISDIR(m)
Definition: win32_port.h:316
#define lstat(path, sb)
Definition: win32_port.h:276
int errmsg(const char *fmt,...)
Definition: elog.c:909
char d_name[MAX_PATH]
Definition: dirent.h:15
#define snprintf
Definition: port.h:216
int FreeDir(DIR *dir)
Definition: fd.c:2796

◆ ReorderBufferCleanupTXN()

static void ReorderBufferCleanupTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1447 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_node, ReorderBuffer::by_txn, ReorderBufferTXN::changes, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, HASH_REMOVE, hash_search(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, rbtxn_is_serialized, rbtxn_is_streamed, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferFreeSnap(), ReorderBufferRestoreCleanup(), ReorderBufferReturnChange(), ReorderBufferReturnTXN(), SnapBuildSnapDecRefcount(), ReorderBufferTXN::snapshot_now, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAbortOld(), ReorderBufferFinishPrepared(), ReorderBufferForget(), ReorderBufferProcessTXN(), ReorderBufferReplay(), and ReorderBufferStreamCommit().

1448 {
1449  bool found;
1450  dlist_mutable_iter iter;
1451 
1452  /* cleanup subtransactions & their changes */
1453  dlist_foreach_modify(iter, &txn->subtxns)
1454  {
1455  ReorderBufferTXN *subtxn;
1456 
1457  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1458 
1459  /*
1460  * Subtransactions are always associated to the toplevel TXN, even if
1461  * they originally were happening inside another subtxn, so we won't
1462  * ever recurse more than one level deep here.
1463  */
1464  Assert(rbtxn_is_known_subxact(subtxn));
1465  Assert(subtxn->nsubtxns == 0);
1466 
1467  ReorderBufferCleanupTXN(rb, subtxn);
1468  }
1469 
1470  /* cleanup changes in the txn */
1471  dlist_foreach_modify(iter, &txn->changes)
1472  {
1473  ReorderBufferChange *change;
1474 
1475  change = dlist_container(ReorderBufferChange, node, iter.cur);
1476 
1477  /* Check we're not mixing changes from different transactions. */
1478  Assert(change->txn == txn);
1479 
1480  ReorderBufferReturnChange(rb, change, true);
1481  }
1482 
1483  /*
1484  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1485  * They are always stored in the toplevel transaction.
1486  */
1487  dlist_foreach_modify(iter, &txn->tuplecids)
1488  {
1489  ReorderBufferChange *change;
1490 
1491  change = dlist_container(ReorderBufferChange, node, iter.cur);
1492 
1493  /* Check we're not mixing changes from different transactions. */
1494  Assert(change->txn == txn);
1496 
1497  ReorderBufferReturnChange(rb, change, true);
1498  }
1499 
1500  /*
1501  * Cleanup the base snapshot, if set.
1502  */
1503  if (txn->base_snapshot != NULL)
1504  {
1507  }
1508 
1509  /*
1510  * Cleanup the snapshot for the last streamed run.
1511  */
1512  if (txn->snapshot_now != NULL)
1513  {
1514  Assert(rbtxn_is_streamed(txn));
1516  }
1517 
1518  /*
1519  * Remove TXN from its containing list.
1520  *
1521  * Note: if txn is known as subxact, we are deleting the TXN from its
1522  * parent's list of known subxacts; this leaves the parent's nsubxacts
1523  * count too high, but we don't care. Otherwise, we are deleting the TXN
1524  * from the LSN-ordered list of toplevel TXNs.
1525  */
1526  dlist_delete(&txn->node);
1527 
1528  /* now remove reference from buffer */
1529  hash_search(rb->by_txn,
1530  (void *) &txn->xid,
1531  HASH_REMOVE,
1532  &found);
1533  Assert(found);
1534 
1535  /* remove entries spilled to disk */
1536  if (rbtxn_is_serialized(txn))
1537  ReorderBufferRestoreCleanup(rb, txn);
1538 
1539  /* deallocate */
1540  ReorderBufferReturnTXN(rb, txn);
1541 }
Snapshot base_snapshot
dlist_node base_snapshot_node
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:543
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:88
Snapshot snapshot_now
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:85
#define rbtxn_is_streamed(txn)
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
dlist_head changes
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
#define Assert(condition)
Definition: c.h:804
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:418
dlist_head subtxns
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
#define rbtxn_is_serialized(txn)
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
#define rbtxn_is_known_subxact(txn)
dlist_head tuplecids

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2629 of file reorderbuffer.c.

References InvalidXLogRecPtr, ReorderBufferReplay(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

2633 {
2634  ReorderBufferTXN *txn;
2635 
2636  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2637  false);
2638 
2639  /* unknown transaction, nothing to replay */
2640  if (txn == NULL)
2641  return;
2642 
2643  ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2644  origin_id, origin_lsn);
2645 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 1131 of file reorderbuffer.c.

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit(), and DecodePrepare().

1134 {
1135  ReorderBufferTXN *subtxn;
1136 
1137  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1138  InvalidXLogRecPtr, false);
1139 
1140  /*
1141  * No need to do anything if that subtxn didn't contain any changes
1142  */
1143  if (!subtxn)
1144  return;
1145 
1146  subtxn->final_lsn = commit_lsn;
1147  subtxn->end_lsn = end_lsn;
1148 
1149  /*
1150  * Assign this subxact as a child of the toplevel xact (no-op if already
1151  * done.)
1152  */
1153  ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1154 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr final_lsn
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
XLogRecPtr end_lsn
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferCopySnap()

static Snapshot ReorderBufferCopySnap ( ReorderBuffer rb,
Snapshot  orig_snap,
ReorderBufferTXN txn,
CommandId  cid 
)
static

Definition at line 1746 of file reorderbuffer.c.

References SnapshotData::active_count, ReorderBuffer::context, SnapshotData::copied, dlist_iter::cur, SnapshotData::curcid, dlist_container, dlist_foreach, i, MemoryContextAllocZero(), ReorderBufferTXN::nsubtxns, qsort, SnapshotData::regd_count, ReorderBufferTXN::subtxns, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::xcnt, ReorderBufferTXN::xid, xidComparator(), and SnapshotData::xip.

Referenced by ReorderBufferProcessTXN(), ReorderBufferSaveTXNSnapshot(), and ReorderBufferStreamTXN().

1748 {
1749  Snapshot snap;
1750  dlist_iter iter;
1751  int i = 0;
1752  Size size;
1753 
1754  size = sizeof(SnapshotData) +
1755  sizeof(TransactionId) * orig_snap->xcnt +
1756  sizeof(TransactionId) * (txn->nsubtxns + 1);
1757 
1758  snap = MemoryContextAllocZero(rb->context, size);
1759  memcpy(snap, orig_snap, sizeof(SnapshotData));
1760 
1761  snap->copied = true;
1762  snap->active_count = 1; /* mark as active so nobody frees it */
1763  snap->regd_count = 0;
1764  snap->xip = (TransactionId *) (snap + 1);
1765 
1766  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1767 
1768  /*
1769  * snap->subxip contains all txids that belong to our transaction which we
1770  * need to check via cmin/cmax. That's why we store the toplevel
1771  * transaction in there as well.
1772  */
1773  snap->subxip = snap->xip + snap->xcnt;
1774  snap->subxip[i++] = txn->xid;
1775 
1776  /*
1777  * subxcnt isn't decreased when subtransactions abort, so count manually.
1778  * Since it's an upper boundary it is safe to use it for the allocation
1779  * above.
1780  */
1781  snap->subxcnt = 1;
1782 
1783  dlist_foreach(iter, &txn->subtxns)
1784  {
1785  ReorderBufferTXN *sub_txn;
1786 
1787  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1788  snap->subxip[i++] = sub_txn->xid;
1789  snap->subxcnt++;
1790  }
1791 
1792  /* sort so we can bsearch() later */
1793  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1794 
1795  /* store the specified current CommandId */
1796  snap->curcid = cid;
1797 
1798  return snap;
1799 }
uint32 TransactionId
Definition: c.h:587
bool copied
Definition: snapshot.h:185
#define dlist_foreach(iter, lhead)
Definition: ilist.h:526
uint32 regd_count
Definition: snapshot.h:205
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
struct SnapshotData SnapshotData
TransactionId * xip
Definition: snapshot.h:168
MemoryContext context
TransactionId xid
dlist_node * cur
Definition: ilist.h:161
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:906
CommandId curcid
Definition: snapshot.h:187
size_t Size
Definition: c.h:540
dlist_head subtxns
uint32 xcnt
Definition: snapshot.h:169
int i
#define qsort(a, b, c, d)
Definition: port.h:504
TransactionId * subxip
Definition: snapshot.h:180
uint32 active_count
Definition: snapshot.h:204
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:136
int32 subxcnt
Definition: snapshot.h:181

◆ ReorderBufferExecuteInvalidations()

static void ReorderBufferExecuteInvalidations ( uint32  nmsgs,
SharedInvalidationMessage msgs 
)
static

Definition at line 3256 of file reorderbuffer.c.

References i, and LocalExecuteInvalidationMessage().

Referenced by ReorderBufferFinishPrepared(), and ReorderBufferProcessTXN().

3257 {
3258  int i;
3259 
3260  for (i = 0; i < nmsgs; i++)
3262 }
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:563
int i

◆ ReorderBufferFinishPrepared()

void ReorderBufferFinishPrepared ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
XLogRecPtr  two_phase_at,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn,
char *  gid,
bool  is_commit 
)

Definition at line 2735 of file reorderbuffer.c.

References Assert, ReorderBuffer::commit_prepared, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, pstrdup(), RBTXN_PREPARE, ReorderBufferCleanupTXN(), ReorderBufferExecuteInvalidations(), ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBuffer::rollback_prepared, ReorderBufferTXNByIdEnt::txn, ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodeAbort(), and DecodeCommit().

2740 {
2741  ReorderBufferTXN *txn;
2742  XLogRecPtr prepare_end_lsn;
2743  TimestampTz prepare_time;
2744 
2745  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
2746 
2747  /* unknown transaction, nothing to do */
2748  if (txn == NULL)
2749  return;
2750 
2751  /*
2752  * By this time the txn has the prepare record information, remember it to
2753  * be later used for rollback.
2754  */
2755  prepare_end_lsn = txn->end_lsn;
2756  prepare_time = txn->xact_time.prepare_time;
2757 
2758  /* add the gid in the txn */
2759  txn->gid = pstrdup(gid);
2760 
2761  /*
2762  * It is possible that this transaction is not decoded at prepare time
2763  * either because by that time we didn't have a consistent snapshot, or
2764  * two_phase was not enabled, or it was decoded earlier but we have
2765  * restarted. We only need to send the prepare if it was not decoded
2766  * earlier. We don't need to decode the xact for aborts if it is not done
2767  * already.
2768  */
2769  if ((txn->final_lsn < two_phase_at) && is_commit)
2770  {
2771  txn->txn_flags |= RBTXN_PREPARE;
2772 
2773  /*
2774  * The prepare info must have been updated in txn even if we skip
2775  * prepare.
2776  */
2778 
2779  /*
2780  * By this time the txn has the prepare record information and it is
2781  * important to use that so that downstream gets the accurate
2782  * information. If instead, we have passed commit information here
2783  * then downstream can behave as it has already replayed commit
2784  * prepared after the restart.
2785  */
2786  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2787  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2788  }
2789 
2790  txn->final_lsn = commit_lsn;
2791  txn->end_lsn = end_lsn;
2792  txn->xact_time.commit_time = commit_time;
2793  txn->origin_id = origin_id;
2794  txn->origin_lsn = origin_lsn;
2795 
2796  if (is_commit)
2797  rb->commit_prepared(rb, txn, commit_lsn);
2798  else
2799  rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
2800 
2801  /* cleanup: make sure there's no cache pollution */
2803  txn->invalidations);
2804  ReorderBufferCleanupTXN(rb, txn);
2805 }
TimestampTz commit_time
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
RepOriginId origin_id
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1299
XLogRecPtr origin_lsn
XLogRecPtr final_lsn
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
union ReorderBufferTXN::@103 xact_time
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn
ReorderBufferRollbackPreparedCB rollback_prepared
SharedInvalidationMessage * invalidations
#define RBTXN_PREPARE
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
TimestampTz prepare_time
ReorderBufferCommitPreparedCB commit_prepared

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2905 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), ReorderBuffer::stream_abort, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

2906 {
2907  ReorderBufferTXN *txn;
2908 
2909  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2910  false);
2911 
2912  /* unknown, nothing to forget */
2913  if (txn == NULL)
2914  return;
2915 
2916  /* For streamed transactions notify the remote node about the abort. */
2917  if (rbtxn_is_streamed(txn))
2918  rb->stream_abort(rb, txn, lsn);
2919 
2920  /* cosmetic... */
2921  txn->final_lsn = lsn;
2922 
2923  /*
2924  * Process cache invalidation messages if there are any. Even if we're not
2925  * interested in the transaction's contents, it could have manipulated the
2926  * catalog and we need to update the caches according to that.
2927  */
2928  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2930  txn->invalidations);
2931  else
2932  Assert(txn->ninvalidations == 0);
2933 
2934  /* remove potential on-disk data, and deallocate */
2935  ReorderBufferCleanupTXN(rb, txn);
2936 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
ReorderBufferStreamAbortCB stream_abort
#define rbtxn_is_streamed(txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:804
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer rb)

Definition at line 376 of file reorderbuffer.c.

References ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

377 {
378  MemoryContext context = rb->context;
379 
380  /*
381  * We free separately allocated data by entirely scrapping reorderbuffer's
382  * memory context.
383  */
384  MemoryContextDelete(context);
385 
386  /* Free disk space used by unconsumed reorder buffers */
388 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:218
ReplicationSlotPersistentData data
Definition: slot.h:147
MemoryContext context
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NameStr(name)
Definition: c.h:681
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)

◆ ReorderBufferFreeSnap()

static void ReorderBufferFreeSnap ( ReorderBuffer rb,
Snapshot  snap 
)
static

Definition at line 1805 of file reorderbuffer.c.

References SnapshotData::copied, pfree(), and SnapBuildSnapDecRefcount().

Referenced by ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), ReorderBufferReturnChange(), and ReorderBufferStreamTXN().

1806 {
1807  if (snap->copied)
1808  pfree(snap);
1809  else
1811 }
bool copied
Definition: snapshot.h:185
void pfree(void *pointer)
Definition: mcxt.c:1169
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:418

◆ ReorderBufferGetChange()

ReorderBufferChange* ReorderBufferGetChange ( ReorderBuffer rb)

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer rb)

Definition at line 956 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessRunningXacts().

957 {
958  ReorderBufferTXN *txn;
959 
960  AssertTXNLsnOrder(rb);
961 
963  return NULL;
964 
966 
969  return txn;
970 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:506
dlist_head toplevel_by_lsn
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:804
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define rbtxn_is_known_subxact(txn)

◆ ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBuffer rb)

Definition at line 984 of file reorderbuffer.c.

References AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, dlist_head_element, dlist_is_empty(), InvalidTransactionId, ReorderBufferTXNByIdEnt::txn, ReorderBuffer::txns_by_base_snapshot_lsn, and SnapshotData::xmin.

Referenced by SnapBuildProcessRunningXacts().

985 {
986  ReorderBufferTXN *txn;
987 
988  AssertTXNLsnOrder(rb);
989 
991  return InvalidTransactionId;
992 
993  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
995  return txn->base_snapshot->xmin;
996 }
Snapshot base_snapshot
dlist_head txns_by_base_snapshot_lsn
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: snapshot.h:157
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:506
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289

◆ ReorderBufferGetRelids()

Oid* ReorderBufferGetRelids ( ReorderBuffer rb,
int  nrelids 
)

Definition at line 577 of file reorderbuffer.c.

References ReorderBuffer::context, and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

578 {
579  Oid *relids;
580  Size alloc_len;
581 
582  alloc_len = sizeof(Oid) * nrelids;
583 
584  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
585 
586  return relids;
587 }
unsigned int Oid
Definition: postgres_ext.h:31
MemoryContext context
size_t Size
Definition: c.h:540
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863

◆ ReorderBufferGetTupleBuf()

ReorderBufferTupleBuf* ReorderBufferGetTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)

Definition at line 541 of file reorderbuffer.c.

References ReorderBufferTupleBuf::alloc_tuple_size, MemoryContextAlloc(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, HeapTupleData::t_data, ReorderBuffer::tup_context, and ReorderBufferTupleBuf::tuple.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

542 {
543  ReorderBufferTupleBuf *tuple;
544  Size alloc_len;
545 
546  alloc_len = tuple_len + SizeofHeapTupleHeader;
547 
548  tuple = (ReorderBufferTupleBuf *)
550  sizeof(ReorderBufferTupleBuf) +
551  MAXIMUM_ALIGNOF + alloc_len);
552  tuple->alloc_tuple_size = alloc_len;
553  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
554 
555  return tuple;
556 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
HeapTupleHeader t_data
Definition: htup.h:68
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:38
HeapTupleData tuple
Definition: reorderbuffer.h:29
size_t Size
Definition: c.h:540
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863
MemoryContext tup_context

◆ ReorderBufferGetTXN()

static ReorderBufferTXN * ReorderBufferGetTXN ( ReorderBuffer rb)
static

Definition at line 394 of file reorderbuffer.c.

References ReorderBufferTXN::changes, ReorderBufferTXN::command_id, dlist_init(), InvalidCommandId, MemoryContextAlloc(), ReorderBufferTXN::output_plugin_private, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferTXNByIdEnt::txn, and ReorderBuffer::txn_context.

Referenced by ReorderBufferTXNByXid().

395 {
396  ReorderBufferTXN *txn;
397 
398  txn = (ReorderBufferTXN *)
400 
401  memset(txn, 0, sizeof(ReorderBufferTXN));
402 
403  dlist_init(&txn->changes);
404  dlist_init(&txn->tuplecids);
405  dlist_init(&txn->subtxns);
406 
407  /* InvalidCommandId is not zero, so set it explicitly */
409  txn->output_plugin_private = NULL;
410 
411  return txn;
412 }
CommandId command_id
dlist_head changes
#define InvalidCommandId
Definition: c.h:604
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
dlist_head subtxns
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863
void * output_plugin_private
MemoryContext txn_context
dlist_head tuplecids

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 2978 of file reorderbuffer.c.

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by DecodeXactOp(), ReorderBufferAbort(), ReorderBufferForget(), and ReorderBufferInvalidate().

2980 {
2981  bool use_subtxn = IsTransactionOrTransactionBlock();
2982  int i;
2983 
2984  if (use_subtxn)
2985  BeginInternalSubTransaction("replay");
2986 
2987  /*
2988  * Force invalidations to happen outside of a valid transaction - that way
2989  * entries will just be marked as invalid without accessing the catalog.
2990  * That's advantageous because we don't need to setup the full state
2991  * necessary for catalog access.
2992  */
2993  if (use_subtxn)
2995 
2996  for (i = 0; i < ninvalidations; i++)
2997  LocalExecuteInvalidationMessage(&invalidations[i]);
2998 
2999  if (use_subtxn)
3001 }
void AbortCurrentTransaction(void)
Definition: xact.c:3210
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4701
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4512
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4407
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:563
int i

◆ ReorderBufferInvalidate()

void ReorderBufferInvalidate ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2947 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodePrepare().

2948 {
2949  ReorderBufferTXN *txn;
2950 
2951  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2952  false);
2953 
2954  /* unknown, nothing to do */
2955  if (txn == NULL)
2956  return;
2957 
2958  /*
2959  * Process cache invalidation messages if there are any. Even if we're not
2960  * interested in the transaction's contents, it could have manipulated the
2961  * catalog and we need to update the caches according to that.
2962  */
2963  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2965  txn->invalidations);
2966  else
2967  Assert(txn->ninvalidations == 0);
2968 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
#define Assert(condition)
Definition: c.h:804
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferIterCompare()

static int ReorderBufferIterCompare ( Datum  a,
Datum  b,
void *  arg 
)
static

Definition at line 1173 of file reorderbuffer.c.

References DatumGetInt32, ReorderBufferIterTXNState::entries, and ReorderBufferIterTXNEntry::lsn.

Referenced by ReorderBufferIterTXNInit().

1174 {
1176  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
1177  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
1178 
1179  if (pos_a < pos_b)
1180  return 1;
1181  else if (pos_a == pos_b)
1182  return 0;
1183  return -1;
1184 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
#define DatumGetInt32(X)
Definition: postgres.h:516
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:317
void * arg

◆ ReorderBufferIterTXNFinish()

static void ReorderBufferIterTXNFinish ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1416 of file reorderbuffer.c.

References Assert, binaryheap_free(), dlist_container, dlist_is_empty(), dlist_pop_head_node(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::file, FileClose(), ReorderBufferIterTXNState::heap, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, pfree(), ReorderBufferReturnChange(), and TXNEntryFile::vfd.

Referenced by ReorderBufferProcessTXN().

1418 {
1419  int32 off;
1420 
1421  for (off = 0; off < state->nr_txns; off++)
1422  {
1423  if (state->entries[off].file.vfd != -1)
1424  FileClose(state->entries[off].file.vfd);
1425  }
1426 
1427  /* free memory we might have "leaked" in the last *Next call */
1428  if (!dlist_is_empty(&state->old_change))
1429  {
1430  ReorderBufferChange *change;
1431 
1432  change = dlist_container(ReorderBufferChange, node,
1433  dlist_pop_head_node(&state->old_change));
1434  ReorderBufferReturnChange(rb, change, true);
1435  Assert(dlist_is_empty(&state->old_change));
1436  }
1437 
1438  binaryheap_free(state->heap);
1439  pfree(state);
1440 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
signed int int32
Definition: c.h:429
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
void pfree(void *pointer)
Definition: mcxt.c:1169
void FileClose(File file)
Definition: fd.c:1917
#define Assert(condition)
Definition: c.h:804
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:69
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)

◆ ReorderBufferIterTXNInit()

static void ReorderBufferIterTXNInit ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferIterTXNState *volatile *  iter_state 
)
static

Definition at line 1196 of file reorderbuffer.c.

References AssertChangeLsnOrder(), binaryheap_add_unordered(), binaryheap_allocate(), binaryheap_build(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, ReorderBuffer::context, dlist_iter::cur, dlist_container, dlist_foreach, dlist_head_element, dlist_init(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::file, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, MemoryContextAllocZero(), ReorderBufferTXN::nentries, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, rbtxn_is_serialized, ReorderBufferIterCompare(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferIterTXNEntry::segno, ReorderBufferTXN::subtxns, ReorderBufferTXNByIdEnt::txn, ReorderBufferIterTXNEntry::txn, and TXNEntryFile::vfd.

Referenced by ReorderBufferProcessTXN().

1198 {
1199  Size nr_txns = 0;
1201  dlist_iter cur_txn_i;
1202  int32 off;
1203 
1204  *iter_state = NULL;
1205 
1206  /* Check ordering of changes in the toplevel transaction. */
1207  AssertChangeLsnOrder(txn);
1208 
1209  /*
1210  * Calculate the size of our heap: one element for every transaction that
1211  * contains changes. (Besides the transactions already in the reorder
1212  * buffer, we count the one we were directly passed.)
1213  */
1214  if (txn->nentries > 0)
1215  nr_txns++;
1216 
1217  dlist_foreach(cur_txn_i, &txn->subtxns)
1218  {
1219  ReorderBufferTXN *cur_txn;
1220 
1221  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1222 
1223  /* Check ordering of changes in this subtransaction. */
1224  AssertChangeLsnOrder(cur_txn);
1225 
1226  if (cur_txn->nentries > 0)
1227  nr_txns++;
1228  }
1229 
1230  /* allocate iteration state */
1231  state = (ReorderBufferIterTXNState *)
1233  sizeof(ReorderBufferIterTXNState) +
1234  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1235 
1236  state->nr_txns = nr_txns;
1237  dlist_init(&state->old_change);
1238 
1239  for (off = 0; off < state->nr_txns; off++)
1240  {
1241  state->entries[off].file.vfd = -1;
1242  state->entries[off].segno = 0;
1243  }
1244 
1245  /* allocate heap */
1246  state->heap = binaryheap_allocate(state->nr_txns,
1248  state);
1249 
1250  /* Now that the state fields are initialized, it is safe to return it. */
1251  *iter_state = state;
1252 
1253  /*
1254  * Now insert items into the binary heap, in an unordered fashion. (We
1255  * will run a heap assembly step at the end; this is more efficient.)
1256  */
1257 
1258  off = 0;
1259 
1260  /* add toplevel transaction if it contains changes */
1261  if (txn->nentries > 0)
1262  {
1263  ReorderBufferChange *cur_change;
1264 
1265  if (rbtxn_is_serialized(txn))
1266  {
1267  /* serialize remaining changes */
1268  ReorderBufferSerializeTXN(rb, txn);
1269  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1270  &state->entries[off].segno);
1271  }
1272 
1273  cur_change = dlist_head_element(ReorderBufferChange, node,
1274  &txn->changes);
1275 
1276  state->entries[off].lsn = cur_change->lsn;
1277  state->entries[off].change = cur_change;
1278  state->entries[off].txn = txn;
1279 
1281  }
1282 
1283  /* add subtransactions if they contain changes */
1284  dlist_foreach(cur_txn_i, &txn->subtxns)
1285  {
1286  ReorderBufferTXN *cur_txn;
1287 
1288  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1289 
1290  if (cur_txn->nentries > 0)
1291  {
1292  ReorderBufferChange *cur_change;
1293 
1294  if (rbtxn_is_serialized(cur_txn))
1295  {
1296  /* serialize remaining changes */
1297  ReorderBufferSerializeTXN(rb, cur_txn);
1298  ReorderBufferRestoreChanges(rb, cur_txn,
1299  &state->entries[off].file,
1300  &state->entries[off].segno);
1301  }
1302  cur_change = dlist_head_element(ReorderBufferChange, node,
1303  &cur_txn->changes);
1304 
1305  state->entries[off].lsn = cur_change->lsn;
1306  state->entries[off].change = cur_change;
1307  state->entries[off].txn = cur_txn;
1308 
1310  }
1311  }
1312 
1313  /* assemble a valid binary heap */
1314  binaryheap_build(state->heap);
1315 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
ReorderBufferTXN * txn
#define dlist_foreach(iter, lhead)
Definition: ilist.h:526
void binaryheap_add_unordered(binaryheap *heap, Datum d)
Definition: binaryheap.c:110
signed int int32
Definition: c.h:429
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
dlist_head changes
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:506
ReorderBufferChange * change
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
MemoryContext context
dlist_node * cur
Definition: ilist.h:161
static void AssertChangeLsnOrder(ReorderBufferTXN *txn)
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:906
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:126
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
Definition: regguts.h:317
size_t Size
Definition: c.h:540
dlist_head subtxns
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:33
#define Int32GetDatum(X)
Definition: postgres.h:523
#define rbtxn_is_serialized(txn)

◆ ReorderBufferIterTXNNext()

static ReorderBufferChange * ReorderBufferIterTXNNext ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1324 of file reorderbuffer.c.

References Assert, binaryheap::bh_size, binaryheap_first(), binaryheap_remove_first(), binaryheap_replace_first(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, DatumGetInt32, DEBUG2, dlist_container, dlist_delete(), dlist_has_next(), dlist_head_element, dlist_is_empty(), dlist_next_node(), dlist_pop_head_node(), dlist_push_tail(), elog, ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::file, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, next, ReorderBufferChange::node, ReorderBufferIterTXNState::old_change, ReorderBufferRestoreChanges(), ReorderBufferReturnChange(), ReorderBufferIterTXNEntry::segno, ReorderBufferTXN::size, ReorderBuffer::totalBytes, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferProcessTXN().

1325 {
1326  ReorderBufferChange *change;
1328  int32 off;
1329 
1330  /* nothing there anymore */
1331  if (state->heap->bh_size == 0)
1332  return NULL;
1333 
1334  off = DatumGetInt32(binaryheap_first(state->heap));
1335  entry = &state->entries[off];
1336 
1337  /* free memory we might have "leaked" in the previous *Next call */
1338  if (!dlist_is_empty(&state->old_change))
1339  {
1340  change = dlist_container(ReorderBufferChange, node,
1341  dlist_pop_head_node(&state->old_change));
1342  ReorderBufferReturnChange(rb, change, true);
1343  Assert(dlist_is_empty(&state->old_change));
1344  }
1345 
1346  change = entry->change;
1347 
1348  /*
1349  * update heap with information about which transaction has the next
1350  * relevant change in LSN order
1351  */
1352 
1353  /* there are in-memory changes */
1354  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1355  {
1356  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1357  ReorderBufferChange *next_change =
1358  dlist_container(ReorderBufferChange, node, next);
1359 
1360  /* txn stays the same */
1361  state->entries[off].lsn = next_change->lsn;
1362  state->entries[off].change = next_change;
1363 
1365  return change;
1366  }
1367 
1368  /* try to load changes from disk */
1369  if (entry->txn->nentries != entry->txn->nentries_mem)
1370  {
1371  /*
1372  * Ugly: restoring changes will reuse *Change records, thus delete the
1373  * current one from the per-tx list and only free in the next call.
1374  */
1375  dlist_delete(&change->node);
1376  dlist_push_tail(&state->old_change, &change->node);
1377 
1378  /*
1379  * Update the total bytes processed by the txn for which we are
1380  * releasing the current set of changes and restoring the new set of
1381  * changes.
1382  */
1383  rb->totalBytes += entry->txn->size;
1384  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1385  &state->entries[off].segno))
1386  {
1387  /* successfully restored changes from disk */
1388  ReorderBufferChange *next_change =
1390  &entry->txn->changes);
1391 
1392  elog(DEBUG2, "restored %u/%u changes from disk",
1393  (uint32) entry->txn->nentries_mem,
1394  (uint32) entry->txn->nentries);
1395 
1396  Assert(entry->txn->nentries_mem);
1397  /* txn stays the same */
1398  state->entries[off].lsn = next_change->lsn;
1399  state->entries[off].change = next_change;
1401 
1402  return change;
1403  }
1404  }
1405 
1406  /* ok, no changes there anymore, remove */
1407  binaryheap_remove_first(state->heap);
1408 
1409  return change;
1410 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
static int32 next
Definition: blutils.c:219
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
#define DatumGetInt32(X)
Definition: postgres.h:516
ReorderBufferTXN * txn
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void binaryheap_replace_first(binaryheap *heap, Datum d)
Definition: binaryheap.c:204
signed int int32
Definition: c.h:429
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:440
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
dlist_head changes
Datum binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:159
#define DEBUG2
Definition: elog.h:24
unsigned int uint32
Definition: c.h:441
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
int bh_size
Definition: binaryheap.h:32
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:506
ReorderBufferChange * change
static bool dlist_has_next(dlist_head *head, dlist_node *node)
Definition: ilist.h:421
#define Assert(condition)
Definition: c.h:804
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define Int32GetDatum(X)
Definition: postgres.h:523
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
#define elog(elevel,...)
Definition: elog.h:232
Datum binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:174
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)

◆ ReorderBufferLargestTopTXN()

static ReorderBufferTXN* ReorderBufferLargestTopTXN ( ReorderBuffer rb)
static

Definition at line 3412 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, dlist_iter::cur, dlist_container, dlist_foreach, rbtxn_has_partial_change, rbtxn_is_known_subxact, ReorderBufferTXN::total_size, ReorderBufferTXNByIdEnt::txn, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferCheckMemoryLimit().

3413 {
3414  dlist_iter iter;
3415  Size largest_size = 0;
3416  ReorderBufferTXN *largest = NULL;
3417 
3418  /* Find the largest top-level transaction having a base snapshot. */
3420  {
3421  ReorderBufferTXN *txn;
3422 
3423  txn = dlist_container(ReorderBufferTXN, base_snapshot_node, iter.cur);
3424 
3425  /* must not be a subtxn */
3427  /* base_snapshot must be set */
3428  Assert(txn->base_snapshot != NULL);
3429 
3430  if ((largest == NULL || txn->total_size > largest_size) &&
3431  (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)))
3432  {
3433  largest = txn;
3434  largest_size = txn->total_size;
3435  }
3436  }
3437 
3438  return largest;
3439 }
Snapshot base_snapshot
#define dlist_foreach(iter, lhead)
Definition: ilist.h:526
#define rbtxn_has_partial_change(txn)
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
dlist_head txns_by_base_snapshot_lsn
dlist_node * cur
Definition: ilist.h:161
#define Assert(condition)
Definition: c.h:804
size_t Size
Definition: c.h:540
#define rbtxn_is_known_subxact(txn)

◆ ReorderBufferLargestTXN()

static ReorderBufferTXN* ReorderBufferLargestTXN ( ReorderBuffer rb)
static

Definition at line 3365 of file reorderbuffer.c.

References Assert, ReorderBuffer::by_txn, hash_seq_init(), hash_seq_search(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferCheckMemoryLimit().

3366 {
3367  HASH_SEQ_STATUS hash_seq;
3369  ReorderBufferTXN *largest = NULL;
3370 
3371  hash_seq_init(&hash_seq, rb->by_txn);
3372  while ((ent = hash_seq_search(&hash_seq)) != NULL)
3373  {
3374  ReorderBufferTXN *txn = ent->txn;
3375 
3376  /* if the current transaction is larger, remember it */
3377  if ((!largest) || (txn->size > largest->size))
3378  largest = txn;
3379  }
3380 
3381  Assert(largest);
3382  Assert(largest->size > 0);
3383  Assert(largest->size <= rb->size);
3384 
3385  return largest;
3386 }
#define Assert(condition)
Definition: c.h:804
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
ReorderBufferTXN * txn

◆ ReorderBufferPrepare()

void ReorderBufferPrepare ( ReorderBuffer rb,
TransactionId  xid,
char *  gid 
)

Definition at line 2698 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::concurrent_abort, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBuffer::prepare, ReorderBufferTXN::prepare_time, pstrdup(), rbtxn_is_streamed, RBTXN_PREPARE, ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBufferTXNByIdEnt::txn, ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodePrepare().

2700 {
2701  ReorderBufferTXN *txn;
2702 
2703  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2704  false);
2705 
2706  /* unknown transaction, nothing to replay */
2707  if (txn == NULL)
2708  return;
2709 
2710  txn->txn_flags |= RBTXN_PREPARE;
2711  txn->gid = pstrdup(gid);
2712 
2713  /* The prepare info must have been updated in txn by now. */
2715 
2716  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2717  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2718 
2719  /*
2720  * We send the prepare for the concurrently aborted xacts so that later
2721  * when rollback prepared is decoded and sent, the downstream should be
2722  * able to rollback such a xact. See comments atop DecodePrepare.
2723  *
2724  * Note, for the concurrent_abort + streaming case a stream_prepare was
2725  * already sent within the ReorderBufferReplay call above.
2726  */
2727  if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
2728  rb->prepare(rb, txn, txn->final_lsn);
2729 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
RepOriginId origin_id
char * pstrdup(const char *in)
Definition: mcxt.c:1299
XLogRecPtr origin_lsn
#define rbtxn_is_streamed(txn)
XLogRecPtr final_lsn
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
union ReorderBufferTXN::@103 xact_time
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn
#define RBTXN_PREPARE
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
TimestampTz prepare_time
ReorderBufferPrepareCB prepare

◆ ReorderBufferProcessPartialChange()

static void ReorderBufferProcessPartialChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  toast_insert 
)
static

Definition at line 693 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, IsInsertOrUpdate, IsSpecConfirmOrAbort, IsSpecInsert, RBTXN_HAS_PARTIAL_CHANGE, rbtxn_has_partial_change, rbtxn_is_serialized, ReorderBufferCanStartStreaming(), ReorderBufferCanStream(), ReorderBufferStreamTXN(), ReorderBufferTXN::toptxn, ReorderBufferChange::tp, ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::txn_flags.

Referenced by ReorderBufferQueueChange().

696 {
697  ReorderBufferTXN *toptxn;
698 
699  /*
700  * The partial changes need to be processed only while streaming
701  * in-progress transactions.
702  */
703  if (!ReorderBufferCanStream(rb))
704  return;
705 
706  /* Get the top transaction. */
707  if (txn->toptxn != NULL)
708  toptxn = txn->toptxn;
709  else
710  toptxn = txn;
711 
712  /*
713  * Indicate a partial change for toast inserts. The change will be
714  * considered as complete once we get the insert or update on the main
715  * table and we are sure that the pending toast chunks are not required
716  * anymore.
717  *
718  * If we allow streaming when there are pending toast chunks then such
719  * chunks won't be released till the insert (multi_insert) is complete and
720  * we expect the txn to have streamed all changes after streaming. This
721  * restriction is mainly to ensure the correctness of streamed
722  * transactions and it doesn't seem worth uplifting such a restriction
723  * just to allow this case because anyway we will stream the transaction
724  * once such an insert is complete.
725  */
726  if (toast_insert)
728  else if (rbtxn_has_partial_change(toptxn) &&
729  IsInsertOrUpdate(change->action) &&
730  change->data.tp.clear_toast_afterwards)
732 
733  /*
734  * Indicate a partial change for speculative inserts. The change will be
735  * considered as complete once we get the speculative confirm or abort
736  * token.
737  */
738  if (IsSpecInsert(change->action))
740  else if (rbtxn_has_partial_change(toptxn) &&
741  IsSpecConfirmOrAbort(change->action))
743 
744  /*
745  * Stream the transaction if it is serialized before and the changes are
746  * now complete in the top-level transaction.
747  *
748  * The reason for doing the streaming of such a transaction as soon as we
749  * get the complete change for it is that previously it would have reached
750  * the memory threshold and wouldn't get streamed because of incomplete
751  * changes. Delaying such transactions would increase apply lag for them.
752  */
754  !(rbtxn_has_partial_change(toptxn)) &&
755  rbtxn_is_serialized(txn))
756  ReorderBufferStreamTXN(rb, toptxn);
757 }
#define rbtxn_has_partial_change(txn)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:85
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define IsSpecInsert(action)
static bool ReorderBufferCanStream(ReorderBuffer *rb)
struct ReorderBufferTXN * toptxn
union ReorderBufferChange::@97 data
#define RBTXN_HAS_PARTIAL_CHANGE
#define IsInsertOrUpdate(action)
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)
#define IsSpecConfirmOrAbort(action)
struct ReorderBufferChange::@97::@98 tp
#define rbtxn_is_serialized(txn)

◆ ReorderBufferProcessTXN()

static void ReorderBufferProcessTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn,
volatile Snapshot  snapshot_now,
volatile CommandId  command_id,
bool  streaming 
)
static

Definition at line 2019 of file reorderbuffer.c.

References AbortCurrentTransaction(), ReorderBufferChange::action, Assert, ReorderBuffer::begin, ReorderBuffer::begin_prepare, BeginInternalSubTransaction(), CheckXidAlive, ReorderBufferChange::command_id, ReorderBuffer::commit, ReorderBufferTXN::concurrent_abort, SnapshotData::copied, CopyErrorData(), SnapshotData::curcid, CurrentMemoryContext, ReorderBufferChange::data, dlist_delete(), elog, ERROR, FlushErrorState(), FreeErrorData(), GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), i, ReorderBufferChange::inval, ReorderBufferTXN::invalidations, InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, ReorderBufferChange::node, ReorderBufferChange::origin_id, ReorderBufferTXN::origin_id, ReorderBuffer::output_rewrites, palloc0(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, ReorderBuffer::prepare, rbtxn_is_streamed, rbtxn_prepared, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelationIsValid, RelidByRelfilenode(), relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferApplyChange(), ReorderBufferApplyMessage(), ReorderBufferApplyTruncate(), ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferResetTXN(), ReorderBufferReturnChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), RollbackAndReleaseCurrentSubTransaction(), SetupCheckXidLive(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, ErrorData::sqlerrcode, StartTransactionCommand(), ReorderBuffer::stream_start, ReorderBuffer::stream_stop, TeardownHistoricSnapshot(), ReorderBufferTXN::total_size, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTXN::tuplecid_hash, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferReplay(), and ReorderBufferStreamTXN().

2024 {
2025  bool using_subtxn;
2027  ReorderBufferIterTXNState *volatile iterstate = NULL;
2028  volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
2029  ReorderBufferChange *volatile specinsert = NULL;
2030  volatile bool stream_started = false;
2031  ReorderBufferTXN *volatile curtxn = NULL;
2032 
2033  /* build data to be able to lookup the CommandIds of catalog tuples */
2035 
2036  /* setup the initial snapshot */
2037  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2038 
2039  /*
2040  * Decoding needs access to syscaches et al., which in turn use
2041  * heavyweight locks and such. Thus we need to have enough state around to
2042  * keep track of those. The easiest way is to simply use a transaction
2043  * internally. That also allows us to easily enforce that nothing writes
2044  * to the database by checking for xid assignments.
2045  *
2046  * When we're called via the SQL SRF there's already a transaction
2047  * started, so start an explicit subtransaction there.
2048  */
2049  using_subtxn = IsTransactionOrTransactionBlock();
2050 
2051  PG_TRY();
2052  {
2053  ReorderBufferChange *change;
2054 
2055  if (using_subtxn)
2056  BeginInternalSubTransaction(streaming ? "stream" : "replay");
2057  else
2059 
2060  /*
2061  * We only need to send begin/begin-prepare for non-streamed
2062  * transactions.
2063  */
2064  if (!streaming)
2065  {
2066  if (rbtxn_prepared(txn))
2067  rb->begin_prepare(rb, txn);
2068  else
2069  rb->begin(rb, txn);
2070  }
2071 
2072  ReorderBufferIterTXNInit(rb, txn, &iterstate);
2073  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
2074  {
2075  Relation relation = NULL;
2076  Oid reloid;
2077 
2078  /*
2079  * We can't call start stream callback before processing first
2080  * change.
2081  */
2082  if (prev_lsn == InvalidXLogRecPtr)
2083  {
2084  if (streaming)
2085  {
2086  txn->origin_id = change->origin_id;
2087  rb->stream_start(rb, txn, change->lsn);
2088  stream_started = true;
2089  }
2090  }
2091 
2092  /*
2093  * Enforce correct ordering of changes, merged from multiple
2094  * subtransactions. The changes may have the same LSN due to
2095  * MULTI_INSERT xlog records.
2096  */
2097  Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
2098 
2099  prev_lsn = change->lsn;
2100 
2101  /*
2102  * Set the current xid to detect concurrent aborts. This is
2103  * required for the cases when we decode the changes before the
2104  * COMMIT record is processed.
2105  */
2106  if (streaming || rbtxn_prepared(change->txn))
2107  {
2108  curtxn = change->txn;
2109  SetupCheckXidLive(curtxn->xid);
2110  }
2111 
2112  switch (change->action)
2113  {
2115 
2116  /*
2117  * Confirmation for speculative insertion arrived. Simply
2118  * use as a normal record. It'll be cleaned up at the end
2119  * of INSERT processing.
2120  */
2121  if (specinsert == NULL)
2122  elog(ERROR, "invalid ordering of speculative insertion changes");
2123  Assert(specinsert->data.tp.oldtuple == NULL);
2124  change = specinsert;
2126 
2127  /* intentionally fall through */
2131  Assert(snapshot_now);
2132 
2133  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
2134  change->data.tp.relnode.relNode);
2135 
2136  /*
2137  * Mapped catalog tuple without data, emitted while
2138  * catalog table was in the process of being rewritten. We
2139  * can fail to look up the relfilenode, because the
2140  * relmapper has no "historic" view, in contrast to the
2141  * normal catalog during decoding. Thus repeated rewrites
2142  * can cause a lookup failure. That's OK because we do not
2143  * decode catalog changes anyway. Normally such tuples
2144  * would be skipped over below, but we can't identify
2145  * whether the table should be logically logged without
2146  * mapping the relfilenode to the oid.
2147  */
2148  if (reloid == InvalidOid &&
2149  change->data.tp.newtuple == NULL &&
2150  change->data.tp.oldtuple == NULL)
2151  goto change_done;
2152  else if (reloid == InvalidOid)
2153  elog(ERROR, "could not map filenode \"%s\" to relation OID",
2154  relpathperm(change->data.tp.relnode,
2155  MAIN_FORKNUM));
2156 
2157  relation = RelationIdGetRelation(reloid);
2158 
2159  if (!RelationIsValid(relation))
2160  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
2161  reloid,
2162  relpathperm(change->data.tp.relnode,
2163  MAIN_FORKNUM));
2164 
2165  if (!RelationIsLogicallyLogged(relation))
2166  goto change_done;
2167 
2168  /*
2169  * Ignore temporary heaps created during DDL unless the
2170  * plugin has asked for them.
2171  */
2172  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2173  goto change_done;
2174 
2175  /*
2176  * For now ignore sequence changes entirely. Most of the
2177  * time they don't log changes using records we
2178  * understand, so it doesn't make sense to handle the few
2179  * cases we do.
2180  */
2181  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2182  goto change_done;
2183 
2184  /* user-triggered change */
2185  if (!IsToastRelation(relation))
2186  {
2187  ReorderBufferToastReplace(rb, txn, relation, change);
2188  ReorderBufferApplyChange(rb, txn, relation, change,
2189  streaming);
2190 
2191  /*
2192  * Only clear reassembled toast chunks if we're sure
2193  * they're not required anymore. The creator of the
2194  * tuple tells us.
2195  */
2196  if (change->data.tp.clear_toast_afterwards)
2197  ReorderBufferToastReset(rb, txn);
2198  }
2199  /* we're not interested in toast deletions */
2200  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2201  {
2202  /*
2203  * Need to reassemble the full toasted Datum in
2204  * memory, to ensure the chunks don't get reused till
2205  * we're done remove it from the list of this
2206  * transaction's changes. Otherwise it will get
2207  * freed/reused while restoring spooled data from
2208  * disk.
2209  */
2210  Assert(change->data.tp.newtuple != NULL);
2211 
2212  dlist_delete(&change->node);
2213  ReorderBufferToastAppendChunk(rb, txn, relation,
2214  change);
2215  }
2216 
2217  change_done:
2218 
2219  /*
2220  * If speculative insertion was confirmed, the record
2221  * isn't needed anymore.
2222  */
2223  if (specinsert != NULL)
2224  {
2225  ReorderBufferReturnChange(rb, specinsert, true);
2226  specinsert = NULL;
2227  }
2228 
2229  if (RelationIsValid(relation))
2230  {
2231  RelationClose(relation);
2232  relation = NULL;
2233  }
2234  break;
2235 
2237 
2238  /*
2239  * Speculative insertions are dealt with by delaying the
2240  * processing of the insert until the confirmation record
2241  * arrives. For that we simply unlink the record from the
2242  * chain, so it does not get freed/reused while restoring
2243  * spooled data from disk.
2244  *
2245  * This is safe in the face of concurrent catalog changes
2246  * because the relevant relation can't be changed between
2247  * speculative insertion and confirmation due to
2248  * CheckTableNotInUse() and locking.
2249  */
2250 
2251  /* clear out a pending (and thus failed) speculation */
2252  if (specinsert != NULL)
2253  {
2254  ReorderBufferReturnChange(rb, specinsert, true);
2255  specinsert = NULL;
2256  }
2257 
2258  /* and memorize the pending insertion */
2259  dlist_delete(&change->node);
2260  specinsert = change;
2261  break;
2262 
2264 
2265  /*
2266  * Abort for speculative insertion arrived. So cleanup the
2267  * specinsert tuple and toast hash.
2268  *
2269  * Note that we get the spec abort change for each toast
2270  * entry but we need to perform the cleanup only the first
2271  * time we get it for the main table.
2272  */
2273  if (specinsert != NULL)
2274  {
2275  /*
2276  * We must clean the toast hash before processing a
2277  * completely new tuple to avoid confusion about the
2278  * previous tuple's toast chunks.
2279  */
2280  Assert(change->data.tp.clear_toast_afterwards);
2281  ReorderBufferToastReset(rb, txn);
2282 
2283  /* We don't need this record anymore. */
2284  ReorderBufferReturnChange(rb, specinsert, true);
2285  specinsert = NULL;
2286  }
2287  break;
2288 
2290  {
2291  int i;
2292  int nrelids = change->data.truncate.nrelids;
2293  int nrelations = 0;
2294  Relation *relations;
2295 
2296  relations = palloc0(nrelids * sizeof(Relation));
2297  for (i = 0; i < nrelids; i++)
2298  {
2299  Oid relid = change->data.truncate.relids[i];
2300  Relation relation;
2301 
2302  relation = RelationIdGetRelation(relid);
2303 
2304  if (!RelationIsValid(relation))
2305  elog(ERROR, "could not open relation with OID %u", relid);
2306 
2307  if (!RelationIsLogicallyLogged(relation))
2308  continue;
2309 
2310  relations[nrelations++] = relation;
2311  }
2312 
2313  /* Apply the truncate. */
2314  ReorderBufferApplyTruncate(rb, txn, nrelations,
2315  relations, change,
2316  streaming);
2317 
2318  for (i = 0; i < nrelations; i++)
2319  RelationClose(relations[i]);
2320 
2321  break;
2322  }
2323 
2325  ReorderBufferApplyMessage(rb, txn, change, streaming);
2326  break;
2327 
2329  /* Execute the invalidation messages locally */
2331  change->data.inval.ninvalidations,
2332  change->data.inval.invalidations);
2333  break;
2334 
2336  /* get rid of the old */
2337  TeardownHistoricSnapshot(false);
2338 
2339  if (snapshot_now->copied)
2340  {
2341  ReorderBufferFreeSnap(rb, snapshot_now);
2342  snapshot_now =
2343  ReorderBufferCopySnap(rb, change->data.snapshot,
2344  txn, command_id);
2345  }
2346 
2347  /*
2348  * Restored from disk, need to be careful not to double
2349  * free. We could introduce refcounting for that, but for
2350  * now this seems infrequent enough not to care.
2351  */
2352  else if (change->data.snapshot->copied)
2353  {
2354  snapshot_now =
2355  ReorderBufferCopySnap(rb, change->data.snapshot,
2356  txn, command_id);
2357  }
2358  else
2359  {
2360  snapshot_now = change->data.snapshot;
2361  }
2362 
2363  /* and continue with the new one */
2364  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2365  break;
2366 
2368  Assert(change->data.command_id != InvalidCommandId);
2369 
2370  if (command_id < change->data.command_id)
2371  {
2372  command_id = change->data.command_id;
2373 
2374  if (!snapshot_now->copied)
2375  {
2376  /* we don't use the global one anymore */
2377  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2378  txn, command_id);
2379  }
2380 
2381  snapshot_now->curcid = command_id;
2382 
2383  TeardownHistoricSnapshot(false);
2384  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2385  }
2386 
2387  break;
2388 
2390  elog(ERROR, "tuplecid value in changequeue");
2391  break;
2392  }
2393  }
2394 
2395  /* speculative insertion record must be freed by now */
2396  Assert(!specinsert);
2397 
2398  /* clean up the iterator */
2399  ReorderBufferIterTXNFinish(rb, iterstate);
2400  iterstate = NULL;
2401 
2402  /*
2403  * Update total transaction count and total bytes processed by the
2404  * transaction and its subtransactions. Ensure to not count the
2405  * streamed transaction multiple times.
2406  *
2407  * Note that the statistics computation has to be done after
2408  * ReorderBufferIterTXNFinish as it releases the serialized change
2409  * which we have already accounted in ReorderBufferIterTXNNext.
2410  */
2411  if (!rbtxn_is_streamed(txn))
2412  rb->totalTxns++;
2413 
2414  rb->totalBytes += txn->total_size;
2415 
2416  /*
2417  * Done with current changes, send the last message for this set of
2418  * changes depending upon streaming mode.
2419  */
2420  if (streaming)
2421  {
2422  if (stream_started)
2423  {
2424  rb->stream_stop(rb, txn, prev_lsn);
2425  stream_started = false;
2426  }
2427  }
2428  else
2429  {
2430  /*
2431  * Call either PREPARE (for two-phase transactions) or COMMIT (for
2432  * regular ones).
2433  */
2434  if (rbtxn_prepared(txn))
2435  rb->prepare(rb, txn, commit_lsn);
2436  else
2437  rb->commit(rb, txn, commit_lsn);
2438  }
2439 
2440  /* this is just a sanity check against bad output plugin behaviour */
2442  elog(ERROR, "output plugin used XID %u",
2444 
2445  /*
2446  * Remember the command ID and snapshot for the next set of changes in
2447  * streaming mode.
2448  */
2449  if (streaming)
2450  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2451  else if (snapshot_now->copied)
2452  ReorderBufferFreeSnap(rb, snapshot_now);
2453 
2454  /* cleanup */
2455  TeardownHistoricSnapshot(false);
2456 
2457  /*
2458  * Aborting the current (sub-)transaction as a whole has the right
2459  * semantics. We want all locks acquired in here to be released, not
2460  * reassigned to the parent and we do not want any database access
2461  * have persistent effects.
2462  */
2464 
2465  /* make sure there's no cache pollution */
2467 
2468  if (using_subtxn)
2470 
2471  /*
2472  * We are here due to one of the four reasons: 1. Decoding an
2473  * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
2474  * prepared txn that was (partially) streamed. 4. Decoding a committed
2475  * txn.
2476  *
2477  * For 1, we allow truncation of txn data by removing the changes
2478  * already streamed but still keeping other things like invalidations,
2479  * snapshot, and tuplecids. For 2 and 3, we indicate
2480  * ReorderBufferTruncateTXN to do more elaborate truncation of txn
2481  * data as the entire transaction has been decoded except for commit.
2482  * For 4, as the entire txn has been decoded, we can fully clean up
2483  * the TXN reorder buffer.
2484  */
2485  if (streaming || rbtxn_prepared(txn))
2486  {
2488  /* Reset the CheckXidAlive */
2490  }
2491  else
2492  ReorderBufferCleanupTXN(rb, txn);
2493  }
2494  PG_CATCH();
2495  {
2496  MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
2497  ErrorData *errdata = CopyErrorData();
2498 
2499  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
2500  if (iterstate)
2501  ReorderBufferIterTXNFinish(rb, iterstate);
2502 
2504 
2505  /*
2506  * Force cache invalidation to happen outside of a valid transaction
2507  * to prevent catalog access as we just caught an error.
2508  */
2510 
2511  /* make sure there's no cache pollution */
2513  txn->invalidations);
2514 
2515  if (using_subtxn)
2517 
2518  /*
2519  * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2520  * abort of the (sub)transaction we are streaming or preparing. We
2521  * need to do the cleanup and return gracefully on this error, see
2522  * SetupCheckXidLive.
2523  *
2524  * This error code can be thrown by one of the callbacks we call
2525  * during decoding so we need to ensure that we return gracefully only
2526  * when we are sending the data in streaming mode and the streaming is
2527  * not finished yet or when we are sending the data out on a PREPARE
2528  * during a two-phase commit.
2529  */
2530  if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2531  (stream_started || rbtxn_prepared(txn)))
2532  {
2533  /* curtxn must be set for streaming or prepared transactions */
2534  Assert(curtxn);
2535 
2536  /* Cleanup the temporary error state. */
2537  FlushErrorState();
2538  FreeErrorData(errdata);
2539  errdata = NULL;
2540  curtxn->concurrent_abort = true;
2541 
2542  /* Reset the TXN so that it is allowed to stream remaining data. */
2543  ReorderBufferResetTXN(rb, txn, snapshot_now,
2544  command_id, prev_lsn,
2545  specinsert);
2546  }
2547  else
2548  {
2549  ReorderBufferCleanupTXN(rb, txn);
2550  MemoryContextSwitchTo(ecxt);
2551  PG_RE_THROW();
2552  }
2553  }
2554  PG_END_TRY();
2555 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
void AbortCurrentTransaction(void)
Definition: xact.c:3210
bool IsToastRelation(Relation relation)
Definition: catalog.c:146
#define relpathperm(rnode, forknum)
Definition: relpath.h:83
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define rbtxn_prepared(txn)
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
bool copied
Definition: snapshot.h:185
int sqlerrcode
Definition: elog.h:381
ErrorData * CopyErrorData(void)
Definition: elog.c:1560
static void ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:88
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4701
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2051
ReorderBufferCommitCB commit
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:674
Form_pg_class rd_rel
Definition: rel.h:109
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:85
void FlushErrorState(void)
Definition: elog.c:1654
#define rbtxn_is_streamed(txn)
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1616
#define ERROR
Definition: elog.h:46
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
#define RelationIsValid(relation)
Definition: rel.h:450
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:438
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4512
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:455
#define InvalidTransactionId
Definition: transam.h:31
ReorderBufferBeginCB begin_prepare
void RelationClose(Relation relation)
Definition: relcache.c:2101
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
struct ReorderBufferChange::@97::@102 inval
union ReorderBufferChange::@97 data
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
RepOriginId origin_id
Definition: reorderbuffer.h:90
TransactionId CheckXidAlive
Definition: xact.c:95
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
static void ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
void * palloc0(Size size)
Definition: mcxt.c:1093
static void ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
#define InvalidCommandId
Definition: c.h:604
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
#define InvalidOid
Definition: postgres_ext.h:36
ReorderBufferStreamStartCB stream_start
#define PG_CATCH()
Definition: elog.h:323
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
void StartTransactionCommand(void)
Definition: xact.c:2838
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4407
SharedInvalidationMessage * invalidations
#define PG_RE_THROW()
Definition: elog.h:354
struct ReorderBufferChange::@97::@98 tp
struct ReorderBufferChange::@97::@99 truncate
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2035
#define elog(elevel,...)
Definition: elog.h:232
int i
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
static void SetupCheckXidLive(TransactionId xid)
ReorderBufferBeginCB begin
#define PG_TRY()
Definition: elog.h:313
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:1995
#define PG_END_TRY()
Definition: elog.h:338
static void ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
ReorderBufferPrepareCB prepare
ReorderBufferStreamStopCB stream_stop

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3014 of file reorderbuffer.c.

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by DecodeHeap2Op(), DecodeHeapOp(), DecodeLogicalMsgOp(), DecodeStandbyOp(), DecodeXactOp(), DecodeXLogOp(), and LogicalDecodingProcessRecord().

3015 {
3016  /* many records won't have an xid assigned, centralize check here */
3017  if (xid != InvalidTransactionId)
3018  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3019 }
#define InvalidTransactionId
Definition: transam.h:31
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change,
bool  toast_insert 
)

Definition at line 764 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, ReorderBufferTXN::concurrent_abort, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBufferChangeMemoryUpdate(), ReorderBufferCheckMemoryLimit(), ReorderBufferProcessPartialChange(), ReorderBufferReturnChange(), ReorderBufferTXNByXid(), ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddInvalidations(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

766 {
767  ReorderBufferTXN *txn;
768 
769  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
770 
771  /*
772  * While streaming the previous changes we have detected that the
773  * transaction is aborted. So there is no point in collecting further
774  * changes for it.
775  */
776  if (txn->concurrent_abort)
777  {
778  /*
779  * We don't need to update memory accounting for this change as we
780  * have not added it to the queue yet.
781  */
782  ReorderBufferReturnChange(rb, change, false);
783  return;
784  }
785 
786  change->lsn = lsn;
787  change->txn = txn;
788 
789  Assert(InvalidXLogRecPtr != lsn);
790  dlist_push_tail(&txn->changes, &change->node);
791  txn->nentries++;
792  txn->nentries_mem++;
793 
794  /* update memory accounting information */
795  ReorderBufferChangeMemoryUpdate(rb, change, true);
796 
797  /* process partial change */
798  ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
799 
800  /* check the memory limits and evict something if needed */
802 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:88
dlist_head changes
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
#define Assert(condition)
Definition: c.h:804
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snapshot,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 809 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), TeardownHistoricSnapshot(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeLogicalMsgOp().

813 {
814  if (transactional)
815  {
816  MemoryContext oldcontext;
817  ReorderBufferChange *change;
818 
820 
821  oldcontext = MemoryContextSwitchTo(rb->context);
822 
823  change = ReorderBufferGetChange(rb);
825  change->data.msg.prefix = pstrdup(prefix);
826  change->data.msg.message_size = message_size;
827  change->data.msg.message = palloc(message_size);
828  memcpy(change->data.msg.message, message, message_size);
829 
830  ReorderBufferQueueChange(rb, xid, lsn, change, false);
831 
832  MemoryContextSwitchTo(oldcontext);
833  }
834  else
835  {
836  ReorderBufferTXN *txn = NULL;
837  volatile Snapshot snapshot_now = snapshot;
838 
839  if (xid != InvalidTransactionId)
840  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
841 
842  /* setup snapshot to allow catalog access */
843  SetupHistoricSnapshot(snapshot_now, NULL);
844  PG_TRY();
845  {
846  rb->message(rb, txn, lsn, false, prefix, message_size, message);
847 
849  }
850  PG_CATCH();
851  {
853  PG_RE_THROW();
854  }
855  PG_END_TRY();
856  }
857 }
char * pstrdup(const char *in)
Definition: mcxt.c:1299
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2051
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:85
#define InvalidTransactionId
Definition: transam.h:31
ReorderBufferMessageCB message
union ReorderBufferChange::@97 data
MemoryContext context
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
#define PG_CATCH()
Definition: elog.h:323
#define Assert(condition)
Definition: c.h:804
struct ReorderBufferChange::@97::@100 msg
#define PG_RE_THROW()
Definition: elog.h:354
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:1062
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2035
#define PG_TRY()
Definition: elog.h:313
#define PG_END_TRY()
Definition: elog.h:338

◆ ReorderBufferRememberPrepareInfo()

bool ReorderBufferRememberPrepareInfo ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  prepare_lsn,
XLogRecPtr  end_lsn,
TimestampTz  prepare_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2651 of file reorderbuffer.c.

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, ReorderBufferTXNByXid(), ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::xact_time.

Referenced by DecodePrepare().

2655 {
2656  ReorderBufferTXN *txn;
2657 
2658  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2659 
2660  /* unknown transaction, nothing to do */
2661  if (txn == NULL)
2662  return false;
2663 
2664  /*
2665  * Remember the prepare information to be later used by commit prepared in
2666  * case we skip doing prepare.
2667  */
2668  txn->final_lsn = prepare_lsn;
2669  txn->end_lsn = end_lsn;
2670  txn->xact_time.prepare_time = prepare_time;
2671  txn->origin_id = origin_id;
2672  txn->origin_lsn = origin_lsn;
2673 
2674  return true;
2675 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
RepOriginId origin_id
XLogRecPtr origin_lsn
XLogRecPtr final_lsn
union ReorderBufferTXN::@103 xact_time
XLogRecPtr end_lsn
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
TimestampTz prepare_time

◆ ReorderBufferReplay()

static void ReorderBufferReplay ( ReorderBufferTXN txn,
ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)
static

Definition at line 2568 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, FirstCommandId, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, rbtxn_is_streamed, rbtxn_prepared, ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), ReorderBufferStreamCommit(), and ReorderBufferTXN::xact_time.

Referenced by ReorderBufferCommit(), ReorderBufferFinishPrepared(), and ReorderBufferPrepare().

2573 {
2574  Snapshot snapshot_now;
2575  CommandId command_id = FirstCommandId;
2576 
2577  txn->final_lsn = commit_lsn;
2578  txn->end_lsn = end_lsn;
2579  txn->xact_time.commit_time = commit_time;
2580  txn->origin_id = origin_id;
2581  txn->origin_lsn = origin_lsn;
2582 
2583  /*
2584  * If the transaction was (partially) streamed, we need to commit it in a
2585  * 'streamed' way. That is, we first stream the remaining part of the
2586  * transaction, and then invoke stream_commit message.
2587  *
2588  * Called after everything (origin ID, LSN, ...) is stored in the
2589  * transaction to avoid passing that information directly.
2590  */
2591  if (rbtxn_is_streamed(txn))
2592  {
2593  ReorderBufferStreamCommit(rb, txn);
2594  return;
2595  }
2596 
2597  /*
2598  * If this transaction has no snapshot, it didn't make any changes to the
2599  * database, so there's nothing to decode. Note that
2600  * ReorderBufferCommitChild will have transferred any snapshots from
2601  * subtransactions if there were any.
2602  */
2603  if (txn->base_snapshot == NULL)
2604  {
2605  Assert(txn->ninvalidations == 0);
2606 
2607  /*
2608  * Removing this txn before a commit might result in the computation
2609  * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
2610  */
2611  if (!rbtxn_prepared(txn))
2612  ReorderBufferCleanupTXN(rb, txn);
2613  return;
2614  }
2615 
2616  snapshot_now = txn->base_snapshot;
2617 
2618  /* Process and send the changes to output plugin. */
2619  ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2620  command_id, false);
2621 }
uint32 CommandId
Definition: c.h:601
TimestampTz commit_time
Snapshot base_snapshot
#define rbtxn_prepared(txn)
RepOriginId origin_id
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
XLogRecPtr origin_lsn
#define FirstCommandId
Definition: c.h:603
#define rbtxn_is_streamed(txn)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
union ReorderBufferTXN::@103 xact_time
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn
static void ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)

◆ ReorderBufferResetTXN()

static void ReorderBufferResetTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id,
XLogRecPtr  last_lsn,
ReorderBufferChange specinsert 
)
static

Definition at line 1976 of file reorderbuffer.c.

References rbtxn_is_streamed, rbtxn_prepared, ReorderBufferReturnChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), and ReorderBuffer::stream_stop.

Referenced by ReorderBufferProcessTXN().

1981 {
1982  /* Discard the changes that we just streamed */
1984 
1985  /* Free all resources allocated for toast reconstruction */
1986  ReorderBufferToastReset(rb, txn);
1987 
1988  /* Return the spec insert change if it is not NULL */
1989  if (specinsert != NULL)
1990  {
1991  ReorderBufferReturnChange(rb, specinsert, true);
1992  specinsert = NULL;
1993  }
1994 
1995  /*
1996  * For the streaming case, stop the stream and remember the command ID and
1997  * snapshot for the streaming run.
1998  */
1999  if (rbtxn_is_streamed(txn))
2000  {
2001  rb->stream_stop(rb, txn, last_lsn);
2002  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2003  }
2004 }
#define rbtxn_prepared(txn)
#define rbtxn_is_streamed(txn)
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
ReorderBufferStreamStopCB stream_stop

◆ ReorderBufferRestoreChange()

static void ReorderBufferRestoreChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
char *  change 
)
static

Definition at line 4202 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, ReorderBufferTXN::changes, ReorderBuffer::context, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::inval, MemoryContextAlloc(), MemoryContextAllocZero(), ReorderBufferChange::msg, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, offsetof, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferGetChange(), ReorderBufferGetRelids(), ReorderBufferGetTupleBuf(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, ReorderBufferChange::snapshot, SnapshotData::subxcnt, ReorderBufferChange::tp, ReorderBufferChange::truncate, SnapshotData::xcnt, and SnapshotData::xip.

Referenced by ReorderBufferRestoreChanges().

4204 {
4205  ReorderBufferDiskChange *ondisk;
4206  ReorderBufferChange *change;
4207 
4208  ondisk = (ReorderBufferDiskChange *) data;
4209 
4210  change = ReorderBufferGetChange(rb);
4211 
4212  /* copy static part */
4213  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
4214 
4215  data += sizeof(ReorderBufferDiskChange);
4216 
4217  /* restore individual stuff */
4218  switch (change->action)
4219  {
4220  /* fall through these, they're all similar enough */
4225  if (change->data.tp.oldtuple)
4226  {
4227  uint32 tuplelen = ((HeapTuple) data)->t_len;
4228 
4229  change->data.tp.oldtuple =
4231 
4232  /* restore ->tuple */
4233  memcpy(&change->data.tp.oldtuple->tuple, data,
4234  sizeof(HeapTupleData));
4235  data += sizeof(HeapTupleData);
4236 
4237  /* reset t_data pointer into the new tuplebuf */
4238  change->data.tp.oldtuple->tuple.t_data =
4239  ReorderBufferTupleBufData(change->data.tp.oldtuple);
4240 
4241  /* restore tuple data itself */
4242  memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
4243  data += tuplelen;
4244  }
4245 
4246  if (change->data.tp.newtuple)
4247  {
4248  /* here, data might not be suitably aligned! */
4249  uint32 tuplelen;
4250 
4251  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
4252  sizeof(uint32));
4253 
4254  change->data.tp.newtuple =
4256 
4257  /* restore ->tuple */
4258  memcpy(&change->data.tp.newtuple->tuple, data,
4259  sizeof(HeapTupleData));
4260  data += sizeof(HeapTupleData);
4261 
4262  /* reset t_data pointer into the new tuplebuf */
4263  change->data.tp.newtuple->tuple.t_data =
4264  ReorderBufferTupleBufData(change->data.tp.newtuple);
4265 
4266  /* restore tuple data itself */
4267  memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
4268  data += tuplelen;
4269  }
4270 
4271  break;
4273  {
4274  Size prefix_size;
4275 
4276  /* read prefix */
4277  memcpy(&prefix_size, data, sizeof(Size));
4278  data += sizeof(Size);
4279  change->data.msg.prefix = MemoryContextAlloc(rb->context,
4280  prefix_size);
4281  memcpy(change->data.msg.prefix, data, prefix_size);
4282  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
4283  data += prefix_size;
4284 
4285  /* read the message */
4286  memcpy(&change->data.msg.message_size, data, sizeof(Size));
4287  data += sizeof(Size);
4288  change->data.msg.message = MemoryContextAlloc(rb->context,
4289  change->data.msg.message_size);
4290  memcpy(change->data.msg.message, data,
4291  change->data.msg.message_size);
4292  data += change->data.msg.message_size;
4293 
4294  break;
4295  }
4297  {
4298  Size inval_size = sizeof(SharedInvalidationMessage) *
4299  change->data.inval.ninvalidations;
4300 
4301  change->data.inval.invalidations =
4302  MemoryContextAlloc(rb->context, inval_size);
4303 
4304  /* read the message */
4305  memcpy(change->data.inval.invalidations, data, inval_size);
4306 
4307  break;
4308  }
4310  {
4311  Snapshot oldsnap;
4312  Snapshot newsnap;
4313  Size size;
4314 
4315  oldsnap = (Snapshot) data;
4316 
4317  size = sizeof(SnapshotData) +
4318  sizeof(TransactionId) * oldsnap->xcnt +
4319  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
4320 
4321  change->data.snapshot = MemoryContextAllocZero(rb->context, size);
4322 
4323  newsnap = change->data.snapshot;
4324 
4325  memcpy(newsnap, data, size);
4326  newsnap->xip = (TransactionId *)
4327  (((char *) newsnap) + sizeof(SnapshotData));
4328  newsnap->subxip = newsnap->xip + newsnap->xcnt;
4329  newsnap->copied = true;
4330  break;
4331  }
4332  /* the base struct contains all the data, easy peasy */
4334  {
4335  Oid *relids;
4336 
4337  relids = ReorderBufferGetRelids(rb,
4338  change->data.truncate.nrelids);
4339  memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
4340  change->data.truncate.relids = relids;
4341 
4342  break;
4343  }
4348  break;
4349  }
4350 
4351  dlist_push_tail(&txn->changes, &change->node);
4352  txn->nentries_mem++;
4353 
4354  /*
4355  * Update memory accounting for the restored change. We need to do this
4356  * although we don't check the memory limit when restoring the changes in
4357  * this branch (we only do that when initially queueing the changes after
4358  * decoding), because we will release the changes later, and that will
4359  * update the accounting too (subtracting the size from the counters). And
4360  * we don't want to underflow there.
4361  */
4362  ReorderBufferChangeMemoryUpdate(rb, change, true);
4363 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
HeapTupleData * HeapTuple
Definition: htup.h:71
uint32 TransactionId
Definition: c.h:587
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
struct SnapshotData * Snapshot
Definition: snapshot.h:121
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:85
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:38
dlist_head changes
struct SnapshotData SnapshotData
unsigned int uint32
Definition: c.h:441
struct ReorderBufferChange::@97::@102 inval
union ReorderBufferChange::@97 data
TransactionId * xip
Definition: snapshot.h:168
MemoryContext context
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:906
struct ReorderBufferDiskChange ReorderBufferDiskChange
#define Assert(condition)
Definition: c.h:804
struct ReorderBufferChange::@97::@100 msg
size_t Size
Definition: c.h:540
struct ReorderBufferChange::@97::@98 tp
struct ReorderBufferChange::@97::@99 truncate
uint32 xcnt
Definition: snapshot.h:169
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863
Oid * ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
ReorderBufferChange change
struct HeapTupleData HeapTupleData
#define offsetof(type, field)
Definition: c.h:727
int32 subxcnt
Definition: snapshot.h:181
ReorderBufferTupleBuf * ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)

◆ ReorderBufferRestoreChanges()

static Size ReorderBufferRestoreChanges ( ReorderBuffer rb,
ReorderBufferTXN txn,
TXNEntryFile file,
XLogSegNo segno 
)
static

Definition at line 4061 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, cleanup(), dlist_mutable_iter::cur, TXNEntryFile::curOffset, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FileClose(), FileRead(), ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, max_changes_in_memory, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBuffer::outbuf, PathNameOpenFile(), PG_BINARY, ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferSerializedPath(), ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, TXNEntryFile::vfd, WAIT_EVENT_REORDER_BUFFER_READ, wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferIterTXNInit(), and ReorderBufferIterTXNNext().

4063 {
4064  Size restored = 0;
4065  XLogSegNo last_segno;
4066  dlist_mutable_iter cleanup_iter;
4067  File *fd = &file->vfd;
4068 
4071 
4072  /* free current entries, so we have memory for more */
4073  dlist_foreach_modify(cleanup_iter, &txn->changes)
4074  {
4076  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
4077 
4078  dlist_delete(&cleanup->node);
4079  ReorderBufferReturnChange(rb, cleanup, true);
4080  }
4081  txn->nentries_mem = 0;
4082  Assert(dlist_is_empty(&txn->changes));
4083 
4084  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
4085 
4086  while (restored < max_changes_in_memory && *segno <= last_segno)
4087  {
4088  int readBytes;
4089  ReorderBufferDiskChange *ondisk;
4090 
4091  if (*fd == -1)
4092  {
4093  char path[MAXPGPATH];
4094 
4095  /* first time in */
4096  if (*segno == 0)
4097  XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
4098 
4099  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
4100 
4101  /*
4102  * No need to care about TLIs here, only used during a single run,
4103  * so each LSN only maps to a specific WAL record.
4104  */
4106  *segno);
4107 
4108  *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
4109 
4110  /* No harm in resetting the offset even in case of failure */
4111  file->curOffset = 0;
4112 
4113  if (*fd < 0 && errno == ENOENT)
4114  {
4115  *fd = -1;
4116  (*segno)++;
4117  continue;
4118  }
4119  else if (*fd < 0)
4120  ereport(ERROR,
4122  errmsg("could not open file \"%s\": %m",
4123  path)));
4124  }
4125 
4126  /*
4127  * Read the statically sized part of a change which has information
4128  * about the total size. If we couldn't read a record, we're at the
4129  * end of this file.
4130  */
4132  readBytes = FileRead(file->vfd, rb->outbuf,
4133  sizeof(ReorderBufferDiskChange),
4135 
4136  /* eof */
4137  if (readBytes == 0)
4138  {
4139  FileClose(*fd);
4140  *fd = -1;
4141  (*segno)++;
4142  continue;
4143  }
4144  else if (readBytes < 0)
4145  ereport(ERROR,
4147  errmsg("could not read from reorderbuffer spill file: %m")));
4148  else if (readBytes != sizeof(ReorderBufferDiskChange))
4149  ereport(ERROR,
4151  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",