PostgreSQL Source Code  git master
reorderbuffer.h File Reference
#include "access/htup_details.h"
#include "lib/ilist.h"
#include "storage/sinval.h"
#include "utils/hsearch.h"
#include "utils/relcache.h"
#include "utils/snapshot.h"
#include "utils/timestamp.h"
Include dependency graph for reorderbuffer.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTupleBuf
 
struct  ReorderBufferChange
 
struct  ReorderBufferTXN
 
struct  ReorderBuffer
 

Macros

#define ReorderBufferTupleBufData(p)   ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))
 
#define RBTXN_HAS_CATALOG_CHANGES   0x0001
 
#define RBTXN_IS_SUBXACT   0x0002
 
#define RBTXN_IS_SERIALIZED   0x0004
 
#define RBTXN_IS_SERIALIZED_CLEAR   0x0008
 
#define RBTXN_IS_STREAMED   0x0010
 
#define RBTXN_HAS_TOAST_INSERT   0x0020
 
#define RBTXN_HAS_SPEC_INSERT   0x0040
 
#define RBTXN_PREPARE   0x0080
 
#define RBTXN_SKIPPED_PREPARE   0x0100
 
#define rbtxn_has_catalog_changes(txn)
 
#define rbtxn_is_known_subxact(txn)
 
#define rbtxn_is_serialized(txn)
 
#define rbtxn_is_serialized_clear(txn)
 
#define rbtxn_has_toast_insert(txn)
 
#define rbtxn_has_spec_insert(txn)
 
#define rbtxn_has_incomplete_tuple(txn)
 
#define rbtxn_is_streamed(txn)
 
#define rbtxn_prepared(txn)
 
#define rbtxn_skip_prepared(txn)
 

Typedefs

typedef struct ReorderBufferTupleBuf ReorderBufferTupleBuf
 
typedef struct ReorderBufferChange ReorderBufferChange
 
typedef struct ReorderBufferTXN ReorderBufferTXN
 
typedef struct ReorderBuffer ReorderBuffer
 
typedef void(* ReorderBufferApplyChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
typedef void(* ReorderBufferApplyTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
typedef void(* ReorderBufferBeginCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
typedef void(* ReorderBufferCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
typedef void(* ReorderBufferMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
typedef void(* ReorderBufferBeginPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
typedef void(* ReorderBufferPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
typedef void(* ReorderBufferCommitPreparedCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
typedef void(* ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
 
typedef void(* ReorderBufferStreamStartCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr first_lsn)
 
typedef void(* ReorderBufferStreamStopCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr last_lsn)
 
typedef void(* ReorderBufferStreamAbortCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
 
typedef void(* ReorderBufferStreamPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
typedef void(* ReorderBufferStreamCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
typedef void(* ReorderBufferStreamChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
typedef void(* ReorderBufferStreamMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
typedef void(* ReorderBufferStreamTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 

Enumerations

enum  ReorderBufferChangeType {
  REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_MESSAGE,
  REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
  REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_TRUNCATE
}
 

Functions

ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *)
 
ReorderBufferTupleBufReorderBufferGetTupleBuf (ReorderBuffer *, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (ReorderBuffer *, ReorderBufferTupleBuf *tuple)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *)
 
void ReorderBufferReturnChange (ReorderBuffer *, ReorderBufferChange *, bool)
 
OidReorderBufferGetRelids (ReorderBuffer *, int nrelids)
 
void ReorderBufferReturnRelids (ReorderBuffer *, Oid *relids)
 
void ReorderBufferQueueChange (ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *, bool toast_insert)
 
void ReorderBufferQueueMessage (ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
void ReorderBufferCommit (ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferFinishPrepared (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr initial_consistent_point, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)
 
void ReorderBufferAssignChild (ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
void ReorderBufferAbort (ReorderBuffer *, TransactionId, XLogRecPtr lsn)
 
void ReorderBufferAbortOld (ReorderBuffer *, TransactionId xid)
 
void ReorderBufferForget (ReorderBuffer *, TransactionId, XLogRecPtr lsn)
 
void ReorderBufferInvalidate (ReorderBuffer *, TransactionId, XLogRecPtr lsn)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap)
 
void ReorderBufferAddSnapshot (ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *, TransactionId, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *, TransactionId, XLogRecPtr lsn, RelFileNode node, ItemPointerData pt, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *, TransactionId, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *, TransactionId xid, XLogRecPtr lsn)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *, TransactionId xid)
 
bool ReorderBufferRememberPrepareInfo (ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferSkipPrepare (ReorderBuffer *rb, TransactionId xid)
 
void ReorderBufferPrepare (ReorderBuffer *rb, TransactionId xid, char *gid)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *)
 
TransactionId ReorderBufferGetOldestXmin (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *, XLogRecPtr ptr)
 
void StartupReorderBuffer (void)
 

Variables

PGDLLIMPORT int logical_decoding_work_mem
 

Macro Definition Documentation

◆ RBTXN_HAS_CATALOG_CHANGES

#define RBTXN_HAS_CATALOG_CHANGES   0x0001

Definition at line 170 of file reorderbuffer.h.

Referenced by ReorderBufferXidSetCatalogChanges().

◆ rbtxn_has_catalog_changes

#define rbtxn_has_catalog_changes (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_HAS_CATALOG_CHANGES) != 0 \
)
#define RBTXN_HAS_CATALOG_CHANGES

Definition at line 181 of file reorderbuffer.h.

Referenced by ReorderBufferBuildTupleCidHash(), and ReorderBufferXidHasCatalogChanges().

◆ rbtxn_has_incomplete_tuple

#define rbtxn_has_incomplete_tuple (   txn)
Value:
( \
rbtxn_has_toast_insert(txn) || rbtxn_has_spec_insert(txn) \
)
#define rbtxn_has_spec_insert(txn)

Definition at line 219 of file reorderbuffer.h.

Referenced by ReorderBufferLargestTopTXN(), and ReorderBufferProcessPartialChange().

◆ RBTXN_HAS_SPEC_INSERT

#define RBTXN_HAS_SPEC_INSERT   0x0040

Definition at line 176 of file reorderbuffer.h.

Referenced by ReorderBufferProcessPartialChange().

◆ rbtxn_has_spec_insert

#define rbtxn_has_spec_insert (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_HAS_SPEC_INSERT) != 0 \
)
#define RBTXN_HAS_SPEC_INSERT

Definition at line 213 of file reorderbuffer.h.

Referenced by ReorderBufferProcessPartialChange().

◆ RBTXN_HAS_TOAST_INSERT

#define RBTXN_HAS_TOAST_INSERT   0x0020

Definition at line 175 of file reorderbuffer.h.

Referenced by ReorderBufferProcessPartialChange().

◆ rbtxn_has_toast_insert

#define rbtxn_has_toast_insert (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_HAS_TOAST_INSERT) != 0 \
)
#define RBTXN_HAS_TOAST_INSERT

Definition at line 205 of file reorderbuffer.h.

Referenced by ReorderBufferProcessPartialChange().

◆ rbtxn_is_known_subxact

#define rbtxn_is_known_subxact (   txn)

◆ RBTXN_IS_SERIALIZED

#define RBTXN_IS_SERIALIZED   0x0004

Definition at line 172 of file reorderbuffer.h.

Referenced by ReorderBufferSerializeTXN(), and ReorderBufferTruncateTXN().

◆ rbtxn_is_serialized

#define rbtxn_is_serialized (   txn)

◆ RBTXN_IS_SERIALIZED_CLEAR

#define RBTXN_IS_SERIALIZED_CLEAR   0x0008

Definition at line 173 of file reorderbuffer.h.

Referenced by ReorderBufferTruncateTXN().

◆ rbtxn_is_serialized_clear

#define rbtxn_is_serialized_clear (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_IS_SERIALIZED_CLEAR) != 0 \
)
#define RBTXN_IS_SERIALIZED_CLEAR

Definition at line 199 of file reorderbuffer.h.

Referenced by ReorderBufferSerializeTXN().

◆ RBTXN_IS_STREAMED

#define RBTXN_IS_STREAMED   0x0010

Definition at line 174 of file reorderbuffer.h.

Referenced by ReorderBufferTruncateTXN().

◆ rbtxn_is_streamed

◆ RBTXN_IS_SUBXACT

#define RBTXN_IS_SUBXACT   0x0002

Definition at line 171 of file reorderbuffer.h.

Referenced by ReorderBufferAssignChild().

◆ RBTXN_PREPARE

#define RBTXN_PREPARE   0x0080

Definition at line 177 of file reorderbuffer.h.

Referenced by ReorderBufferFinishPrepared(), and ReorderBufferPrepare().

◆ rbtxn_prepared

#define rbtxn_prepared (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_PREPARE) != 0 \
)
#define RBTXN_PREPARE

Definition at line 239 of file reorderbuffer.h.

Referenced by ReorderBufferProcessTXN(), ReorderBufferReplay(), ReorderBufferResetTXN(), ReorderBufferStreamCommit(), and SnapBuildDistributeNewCatalogSnapshot().

◆ rbtxn_skip_prepared

#define rbtxn_skip_prepared (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_SKIPPED_PREPARE) != 0 \
)
#define RBTXN_SKIPPED_PREPARE

Definition at line 245 of file reorderbuffer.h.

Referenced by SnapBuildDistributeNewCatalogSnapshot().

◆ RBTXN_SKIPPED_PREPARE

#define RBTXN_SKIPPED_PREPARE   0x0100

Definition at line 178 of file reorderbuffer.h.

Referenced by ReorderBufferSkipPrepare().

◆ ReorderBufferTupleBufData

#define ReorderBufferTupleBufData (   p)    ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))

Typedef Documentation

◆ ReorderBuffer

typedef struct ReorderBuffer ReorderBuffer

Definition at line 411 of file reorderbuffer.h.

◆ ReorderBufferApplyChangeCB

typedef void(* ReorderBufferApplyChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)

Definition at line 414 of file reorderbuffer.h.

◆ ReorderBufferApplyTruncateCB

typedef void(* ReorderBufferApplyTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)

Definition at line 420 of file reorderbuffer.h.

◆ ReorderBufferBeginCB

typedef void(* ReorderBufferBeginCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)

Definition at line 427 of file reorderbuffer.h.

◆ ReorderBufferBeginPrepareCB

typedef void(* ReorderBufferBeginPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)

Definition at line 444 of file reorderbuffer.h.

◆ ReorderBufferChange

◆ ReorderBufferCommitCB

typedef void(* ReorderBufferCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

Definition at line 431 of file reorderbuffer.h.

◆ ReorderBufferCommitPreparedCB

typedef void(* ReorderBufferCommitPreparedCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

Definition at line 453 of file reorderbuffer.h.

◆ ReorderBufferMessageCB

typedef void(* ReorderBufferMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)

Definition at line 436 of file reorderbuffer.h.

◆ ReorderBufferPrepareCB

typedef void(* ReorderBufferPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)

Definition at line 448 of file reorderbuffer.h.

◆ ReorderBufferRollbackPreparedCB

typedef void(* ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)

Definition at line 458 of file reorderbuffer.h.

◆ ReorderBufferStreamAbortCB

typedef void(* ReorderBufferStreamAbortCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)

Definition at line 476 of file reorderbuffer.h.

◆ ReorderBufferStreamChangeCB

typedef void(* ReorderBufferStreamChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)

Definition at line 494 of file reorderbuffer.h.

◆ ReorderBufferStreamCommitCB

typedef void(* ReorderBufferStreamCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

Definition at line 488 of file reorderbuffer.h.

◆ ReorderBufferStreamMessageCB

typedef void(* ReorderBufferStreamMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)

Definition at line 501 of file reorderbuffer.h.

◆ ReorderBufferStreamPrepareCB

typedef void(* ReorderBufferStreamPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)

Definition at line 482 of file reorderbuffer.h.

◆ ReorderBufferStreamStartCB

typedef void(* ReorderBufferStreamStartCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr first_lsn)

Definition at line 464 of file reorderbuffer.h.

◆ ReorderBufferStreamStopCB

typedef void(* ReorderBufferStreamStopCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr last_lsn)

Definition at line 470 of file reorderbuffer.h.

◆ ReorderBufferStreamTruncateCB

typedef void(* ReorderBufferStreamTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)

Definition at line 510 of file reorderbuffer.h.

◆ ReorderBufferTupleBuf

◆ ReorderBufferTXN

Enumeration Type Documentation

◆ ReorderBufferChangeType

Enumerator
REORDER_BUFFER_CHANGE_INSERT 
REORDER_BUFFER_CHANGE_UPDATE 
REORDER_BUFFER_CHANGE_DELETE 
REORDER_BUFFER_CHANGE_MESSAGE 
REORDER_BUFFER_CHANGE_INVALIDATION 
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT 
REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID 
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM 
REORDER_BUFFER_CHANGE_TRUNCATE 

Definition at line 54 of file reorderbuffer.h.

Function Documentation

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn 
)

Definition at line 2786 of file reorderbuffer.c.

References ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), ReorderBuffer::stream_abort, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeAbort().

2787 {
2788  ReorderBufferTXN *txn;
2789 
2790  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2791  false);
2792 
2793  /* unknown, nothing to remove */
2794  if (txn == NULL)
2795  return;
2796 
2797  /* For streamed transactions notify the remote node about the abort. */
2798  if (rbtxn_is_streamed(txn))
2799  {
2800  rb->stream_abort(rb, txn, lsn);
2801 
2802  /*
2803  * We might have decoded changes for this transaction that could load
2804  * the cache as per the current transaction's view (consider DDL's
2805  * happened in this transaction). We don't want the decoding of future
2806  * transactions to use those cache entries so execute invalidations.
2807  */
2808  if (txn->ninvalidations > 0)
2810  txn->invalidations);
2811  }
2812 
2813  /* cosmetic... */
2814  txn->final_lsn = lsn;
2815 
2816  /* remove potential on-disk data, and deallocate */
2817  ReorderBufferCleanupTXN(rb, txn);
2818 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define rbtxn_is_streamed(txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer ,
TransactionId  xid 
)

Definition at line 2828 of file reorderbuffer.c.

References dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, elog, ReorderBufferCleanupTXN(), ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::xid.

Referenced by DecodeStandbyOp().

2829 {
2830  dlist_mutable_iter it;
2831 
2832  /*
2833  * Iterate through all (potential) toplevel TXNs and abort all that are
2834  * older than what possibly can be running. Once we've found the first
2835  * that is alive we stop, there might be some that acquired an xid earlier
2836  * but started writing later, but it's unlikely and they will be cleaned
2837  * up in a later call to this function.
2838  */
2839  dlist_foreach_modify(it, &rb->toplevel_by_lsn)
2840  {
2841  ReorderBufferTXN *txn;
2842 
2843  txn = dlist_container(ReorderBufferTXN, node, it.cur);
2844 
2845  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2846  {
2847  elog(DEBUG2, "aborting old transaction %u", txn->xid);
2848 
2849  /* remove potential on-disk data, and deallocate this tx */
2850  ReorderBufferCleanupTXN(rb, txn);
2851  }
2852  else
2853  return;
2854  }
2855 }
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:543
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
#define DEBUG2
Definition: elog.h:24
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
#define elog(elevel,...)
Definition: elog.h:232

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 3165 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferTXN::invalidations, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, palloc(), REORDER_BUFFER_CHANGE_INVALIDATION, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), repalloc(), ReorderBufferTXN::toptxn, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeXactOp().

3168 {
3169  ReorderBufferTXN *txn;
3170  MemoryContext oldcontext;
3171  ReorderBufferChange *change;
3172 
3173  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3174 
3175  oldcontext = MemoryContextSwitchTo(rb->context);
3176 
3177  /*
3178  * Collect all the invalidations under the top transaction so that we can
3179  * execute them all together. See comment atop this function
3180  */
3181  if (txn->toptxn)
3182  txn = txn->toptxn;
3183 
3184  Assert(nmsgs > 0);
3185 
3186  /* Accumulate invalidations. */
3187  if (txn->ninvalidations == 0)
3188  {
3189  txn->ninvalidations = nmsgs;
3191  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3192  memcpy(txn->invalidations, msgs,
3193  sizeof(SharedInvalidationMessage) * nmsgs);
3194  }
3195  else
3196  {
3199  (txn->ninvalidations + nmsgs));
3200 
3201  memcpy(txn->invalidations + txn->ninvalidations, msgs,
3202  nmsgs * sizeof(SharedInvalidationMessage));
3203  txn->ninvalidations += nmsgs;
3204  }
3205 
3206  change = ReorderBufferGetChange(rb);
3208  change->data.inval.ninvalidations = nmsgs;
3209  change->data.inval.invalidations = (SharedInvalidationMessage *)
3210  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3211  memcpy(change->data.inval.invalidations, msgs,
3212  sizeof(SharedInvalidationMessage) * nmsgs);
3213 
3214  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3215 
3216  MemoryContextSwitchTo(oldcontext);
3217 }
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
struct ReorderBufferTXN * toptxn
struct ReorderBufferChange::@97::@102 inval
union ReorderBufferChange::@97 data
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
#define Assert(condition)
Definition: c.h:804
SharedInvalidationMessage * invalidations
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1182
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:1062

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
CommandId  cid 
)

Definition at line 3042 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::command_id, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, ReorderBufferGetChange(), and ReorderBufferQueueChange().

Referenced by SnapBuildProcessNewCid().

3044 {
3046 
3047  change->data.command_id = cid;
3049 
3050  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3051 }
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
union ReorderBufferChange::@97 data
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
RelFileNode  node,
ItemPointerData  pt,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 3129 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessNewCid().

3133 {
3135  ReorderBufferTXN *txn;
3136 
3137  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3138 
3139  change->data.tuplecid.node = node;
3140  change->data.tuplecid.tid = tid;
3141  change->data.tuplecid.cmin = cmin;
3142  change->data.tuplecid.cmax = cmax;
3143  change->data.tuplecid.combocid = combocid;
3144  change->lsn = lsn;
3145  change->txn = txn;
3147 
3148  dlist_push_tail(&txn->tuplecids, &change->node);
3149  txn->ntuplecids++;
3150 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:87
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
union ReorderBufferChange::@97 data
struct ReorderBufferChange::@97::@101 tuplecid
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
dlist_head tuplecids

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
struct SnapshotData snap 
)

◆ ReorderBufferAllocate()

ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 299 of file reorderbuffer.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, ReorderBufferCleanupSerializedTXNs(), ReorderBuffer::size, SLAB_DEFAULT_BLOCK_SIZE, SLAB_LARGE_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBuffer::toplevel_by_lsn, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBuffer::tup_context, ReorderBuffer::txn_context, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

300 {
301  ReorderBuffer *buffer;
302  HASHCTL hash_ctl;
303  MemoryContext new_ctx;
304 
305  Assert(MyReplicationSlot != NULL);
306 
307  /* allocate memory in own context, to have better accountability */
309  "ReorderBuffer",
311 
312  buffer =
313  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
314 
315  memset(&hash_ctl, 0, sizeof(hash_ctl));
316 
317  buffer->context = new_ctx;
318 
319  buffer->change_context = SlabContextCreate(new_ctx,
320  "Change",
322  sizeof(ReorderBufferChange));
323 
324  buffer->txn_context = SlabContextCreate(new_ctx,
325  "TXN",
327  sizeof(ReorderBufferTXN));
328 
329  buffer->tup_context = GenerationContextCreate(new_ctx,
330  "Tuples",
332 
333  hash_ctl.keysize = sizeof(TransactionId);
334  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
335  hash_ctl.hcxt = buffer->context;
336 
337  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
339 
341  buffer->by_txn_last_txn = NULL;
342 
343  buffer->outbuf = NULL;
344  buffer->outbufsize = 0;
345  buffer->size = 0;
346 
347  buffer->spillTxns = 0;
348  buffer->spillCount = 0;
349  buffer->spillBytes = 0;
350  buffer->streamTxns = 0;
351  buffer->streamCount = 0;
352  buffer->streamBytes = 0;
353  buffer->totalTxns = 0;
354  buffer->totalBytes = 0;
355 
357 
358  dlist_init(&buffer->toplevel_by_lsn);
360 
361  /*
362  * Ensure there's no stale data from prior uses of this slot, in case some
363  * prior exit avoided calling ReorderBufferFree. Failure to do this can
364  * produce duplicated txns, and it's very cheap if there's nothing there.
365  */
367 
368  return buffer;
369 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define AllocSetContextCreate
Definition: memutils.h:173
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
uint32 TransactionId
Definition: c.h:587
MemoryContext hcxt
Definition: hsearch.h:86
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
Size entrysize
Definition: hsearch.h:76
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:175
ReplicationSlotPersistentData data
Definition: slot.h:156
MemoryContext change_context
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:225
dlist_head txns_by_base_snapshot_lsn
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:195
#define InvalidTransactionId
Definition: transam.h:31
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
#define HASH_BLOBS
Definition: hsearch.h:97
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size blockSize)
Definition: generation.c:197
MemoryContext context
ReorderBufferTXN * by_txn_last_txn
dlist_head toplevel_by_lsn
Size keysize
Definition: hsearch.h:75
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:804
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:224
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863
#define NameStr(name)
Definition: c.h:681
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
MemoryContext tup_context
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext txn_context

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer ,
TransactionId  ,
TransactionId  ,
XLogRecPtr  commit_lsn 
)

Definition at line 1001 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SUBXACT, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, ReorderBufferTXN::toplevel_xid, ReorderBufferTXN::toptxn, ReorderBufferTXNByIdEnt::txn, ReorderBufferTXN::txn_flags, and ReorderBufferTXNByIdEnt::xid.

Referenced by LogicalDecodingProcessRecord(), and ReorderBufferCommitChild().

1003 {
1004  ReorderBufferTXN *txn;
1005  ReorderBufferTXN *subtxn;
1006  bool new_top;
1007  bool new_sub;
1008 
1009  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1010  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1011 
1012  if (!new_sub)
1013  {
1014  if (rbtxn_is_known_subxact(subtxn))
1015  {
1016  /* already associated, nothing to do */
1017  return;
1018  }
1019  else
1020  {
1021  /*
1022  * We already saw this transaction, but initially added it to the
1023  * list of top-level txns. Now that we know it's not top-level,
1024  * remove it from there.
1025  */
1026  dlist_delete(&subtxn->node);
1027  }
1028  }
1029 
1030  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1031  subtxn->toplevel_xid = xid;
1032  Assert(subtxn->nsubtxns == 0);
1033 
1034  /* set the reference to top-level transaction */
1035  subtxn->toptxn = txn;
1036 
1037  /* add to subtransaction list */
1038  dlist_push_tail(&txn->subtxns, &subtxn->node);
1039  txn->nsubtxns++;
1040 
1041  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1042  ReorderBufferTransferSnapToParent(txn, subtxn);
1043 
1044  /* Verify LSN-ordering invariant */
1045  AssertTXNLsnOrder(rb);
1046 }
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
#define RBTXN_IS_SUBXACT
struct ReorderBufferTXN * toptxn
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:804
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_known_subxact(txn)
TransactionId toplevel_xid

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2599 of file reorderbuffer.c.

References InvalidXLogRecPtr, ReorderBufferReplay(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

2603 {
2604  ReorderBufferTXN *txn;
2605 
2606  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2607  false);
2608 
2609  /* unknown transaction, nothing to replay */
2610  if (txn == NULL)
2611  return;
2612 
2613  ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2614  origin_id, origin_lsn);
2615 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer ,
TransactionId  ,
TransactionId  ,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 1121 of file reorderbuffer.c.

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit(), and DecodePrepare().

1124 {
1125  ReorderBufferTXN *subtxn;
1126 
1127  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1128  InvalidXLogRecPtr, false);
1129 
1130  /*
1131  * No need to do anything if that subtxn didn't contain any changes
1132  */
1133  if (!subtxn)
1134  return;
1135 
1136  subtxn->final_lsn = commit_lsn;
1137  subtxn->end_lsn = end_lsn;
1138 
1139  /*
1140  * Assign this subxact as a child of the toplevel xact (no-op if already
1141  * done.)
1142  */
1143  ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1144 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr final_lsn
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
XLogRecPtr end_lsn
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferFinishPrepared()

void ReorderBufferFinishPrepared ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
XLogRecPtr  initial_consistent_point,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn,
char *  gid,
bool  is_commit 
)

Definition at line 2702 of file reorderbuffer.c.

References Assert, ReorderBuffer::commit_prepared, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, pstrdup(), RBTXN_PREPARE, ReorderBufferCleanupTXN(), ReorderBufferExecuteInvalidations(), ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBuffer::rollback_prepared, ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::txn_flags.

Referenced by DecodeAbort(), and DecodeCommit().

2707 {
2708  ReorderBufferTXN *txn;
2709  XLogRecPtr prepare_end_lsn;
2710  TimestampTz prepare_time;
2711 
2712  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
2713 
2714  /* unknown transaction, nothing to do */
2715  if (txn == NULL)
2716  return;
2717 
2718  /*
2719  * By this time the txn has the prepare record information, remember it to
2720  * be later used for rollback.
2721  */
2722  prepare_end_lsn = txn->end_lsn;
2723  prepare_time = txn->commit_time;
2724 
2725  /* add the gid in the txn */
2726  txn->gid = pstrdup(gid);
2727 
2728  /*
2729  * It is possible that this transaction is not decoded at prepare time
2730  * either because by that time we didn't have a consistent snapshot or it
2731  * was decoded earlier but we have restarted. We only need to send the
2732  * prepare if it was not decoded earlier. We don't need to decode the xact
2733  * for aborts if it is not done already.
2734  */
2735  if ((txn->final_lsn < initial_consistent_point) && is_commit)
2736  {
2737  txn->txn_flags |= RBTXN_PREPARE;
2738 
2739  /*
2740  * The prepare info must have been updated in txn even if we skip
2741  * prepare.
2742  */
2744 
2745  /*
2746  * By this time the txn has the prepare record information and it is
2747  * important to use that so that downstream gets the accurate
2748  * information. If instead, we have passed commit information here
2749  * then downstream can behave as it has already replayed commit
2750  * prepared after the restart.
2751  */
2752  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2753  txn->commit_time, txn->origin_id, txn->origin_lsn);
2754  }
2755 
2756  txn->final_lsn = commit_lsn;
2757  txn->end_lsn = end_lsn;
2758  txn->commit_time = commit_time;
2759  txn->origin_id = origin_id;
2760  txn->origin_lsn = origin_lsn;
2761 
2762  if (is_commit)
2763  rb->commit_prepared(rb, txn, commit_lsn);
2764  else
2765  rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
2766 
2767  /* cleanup: make sure there's no cache pollution */
2769  txn->invalidations);
2770  ReorderBufferCleanupTXN(rb, txn);
2771 }
TimestampTz commit_time
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
RepOriginId origin_id
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1299
XLogRecPtr origin_lsn
XLogRecPtr final_lsn
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn
ReorderBufferRollbackPreparedCB rollback_prepared
SharedInvalidationMessage * invalidations
#define RBTXN_PREPARE
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
ReorderBufferCommitPreparedCB commit_prepared

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn 
)

Definition at line 2871 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), ReorderBuffer::stream_abort, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

2872 {
2873  ReorderBufferTXN *txn;
2874 
2875  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2876  false);
2877 
2878  /* unknown, nothing to forget */
2879  if (txn == NULL)
2880  return;
2881 
2882  /* For streamed transactions notify the remote node about the abort. */
2883  if (rbtxn_is_streamed(txn))
2884  rb->stream_abort(rb, txn, lsn);
2885 
2886  /* cosmetic... */
2887  txn->final_lsn = lsn;
2888 
2889  /*
2890  * Process cache invalidation messages if there are any. Even if we're not
2891  * interested in the transaction's contents, it could have manipulated the
2892  * catalog and we need to update the caches according to that.
2893  */
2894  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2896  txn->invalidations);
2897  else
2898  Assert(txn->ninvalidations == 0);
2899 
2900  /* remove potential on-disk data, and deallocate */
2901  ReorderBufferCleanupTXN(rb, txn);
2902 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
#define rbtxn_is_streamed(txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:804
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer )

Definition at line 375 of file reorderbuffer.c.

References ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

376 {
377  MemoryContext context = rb->context;
378 
379  /*
380  * We free separately allocated data by entirely scrapping reorderbuffer's
381  * memory context.
382  */
383  MemoryContextDelete(context);
384 
385  /* Free disk space used by unconsumed reorder buffers */
387 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:218
ReplicationSlotPersistentData data
Definition: slot.h:156
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NameStr(name)
Definition: c.h:681
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)

◆ ReorderBufferGetChange()

ReorderBufferChange* ReorderBufferGetChange ( ReorderBuffer )

Definition at line 453 of file reorderbuffer.c.

References ReorderBuffer::change_context, and MemoryContextAlloc().

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddInvalidations(), ReorderBufferAddNewCommandId(), ReorderBufferAddNewTupleCids(), ReorderBufferAddSnapshot(), ReorderBufferQueueMessage(), and ReorderBufferRestoreChange().

454 {
455  ReorderBufferChange *change;
456 
457  change = (ReorderBufferChange *)
458  MemoryContextAlloc(rb->change_context, sizeof(ReorderBufferChange));
459 
460  memset(change, 0, sizeof(ReorderBufferChange));
461  return change;
462 }
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer )

Definition at line 946 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessRunningXacts().

947 {
948  ReorderBufferTXN *txn;
949 
950  AssertTXNLsnOrder(rb);
951 
952  if (dlist_is_empty(&rb->toplevel_by_lsn))
953  return NULL;
954 
955  txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
956 
959  return txn;
960 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:506
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:804
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define rbtxn_is_known_subxact(txn)

◆ ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBuffer rb)

Definition at line 974 of file reorderbuffer.c.

References AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, dlist_head_element, dlist_is_empty(), InvalidTransactionId, ReorderBufferTXNByIdEnt::txn, ReorderBuffer::txns_by_base_snapshot_lsn, and SnapshotData::xmin.

Referenced by SnapBuildProcessRunningXacts().

975 {
976  ReorderBufferTXN *txn;
977 
978  AssertTXNLsnOrder(rb);
979 
981  return InvalidTransactionId;
982 
983  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
985  return txn->base_snapshot->xmin;
986 }
Snapshot base_snapshot
dlist_head txns_by_base_snapshot_lsn
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: snapshot.h:157
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:506
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289

◆ ReorderBufferGetRelids()

Oid* ReorderBufferGetRelids ( ReorderBuffer ,
int  nrelids 
)

Definition at line 572 of file reorderbuffer.c.

References ReorderBuffer::context, and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

573 {
574  Oid *relids;
575  Size alloc_len;
576 
577  alloc_len = sizeof(Oid) * nrelids;
578 
579  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
580 
581  return relids;
582 }
unsigned int Oid
Definition: postgres_ext.h:31
size_t Size
Definition: c.h:540
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863

◆ ReorderBufferGetTupleBuf()

ReorderBufferTupleBuf* ReorderBufferGetTupleBuf ( ReorderBuffer ,
Size  tuple_len 
)

Definition at line 536 of file reorderbuffer.c.

References ReorderBufferTupleBuf::alloc_tuple_size, MemoryContextAlloc(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, HeapTupleData::t_data, ReorderBuffer::tup_context, and ReorderBufferTupleBuf::tuple.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

537 {
538  ReorderBufferTupleBuf *tuple;
539  Size alloc_len;
540 
541  alloc_len = tuple_len + SizeofHeapTupleHeader;
542 
543  tuple = (ReorderBufferTupleBuf *)
544  MemoryContextAlloc(rb->tup_context,
545  sizeof(ReorderBufferTupleBuf) +
546  MAXIMUM_ALIGNOF + alloc_len);
547  tuple->alloc_tuple_size = alloc_len;
548  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
549 
550  return tuple;
551 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
HeapTupleHeader t_data
Definition: htup.h:68
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:38
HeapTupleData tuple
Definition: reorderbuffer.h:29
size_t Size
Definition: c.h:540
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer ,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 2944 of file reorderbuffer.c.

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by DecodeXactOp(), ReorderBufferAbort(), ReorderBufferForget(), and ReorderBufferInvalidate().

2946 {
2947  bool use_subtxn = IsTransactionOrTransactionBlock();
2948  int i;
2949 
2950  if (use_subtxn)
2951  BeginInternalSubTransaction("replay");
2952 
2953  /*
2954  * Force invalidations to happen outside of a valid transaction - that way
2955  * entries will just be marked as invalid without accessing the catalog.
2956  * That's advantageous because we don't need to setup the full state
2957  * necessary for catalog access.
2958  */
2959  if (use_subtxn)
2961 
2962  for (i = 0; i < ninvalidations; i++)
2963  LocalExecuteInvalidationMessage(&invalidations[i]);
2964 
2965  if (use_subtxn)
2967 }
void AbortCurrentTransaction(void)
Definition: xact.c:3210
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4701
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4512
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4407
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:562
int i

◆ ReorderBufferInvalidate()

void ReorderBufferInvalidate ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn 
)

Definition at line 2913 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodePrepare().

2914 {
2915  ReorderBufferTXN *txn;
2916 
2917  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2918  false);
2919 
2920  /* unknown, nothing to do */
2921  if (txn == NULL)
2922  return;
2923 
2924  /*
2925  * Process cache invalidation messages if there are any. Even if we're not
2926  * interested in the transaction's contents, it could have manipulated the
2927  * catalog and we need to update the caches according to that.
2928  */
2929  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2931  txn->invalidations);
2932  else
2933  Assert(txn->ninvalidations == 0);
2934 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
#define Assert(condition)
Definition: c.h:804
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferPrepare()

void ReorderBufferPrepare ( ReorderBuffer rb,
TransactionId  xid,
char *  gid 
)

Definition at line 2668 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::commit_time, ReorderBufferTXN::concurrent_abort, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBuffer::prepare, pstrdup(), RBTXN_PREPARE, ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::txn_flags.

Referenced by DecodePrepare().

2670 {
2671  ReorderBufferTXN *txn;
2672 
2673  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2674  false);
2675 
2676  /* unknown transaction, nothing to replay */
2677  if (txn == NULL)
2678  return;
2679 
2680  txn->txn_flags |= RBTXN_PREPARE;
2681  txn->gid = pstrdup(gid);
2682 
2683  /* The prepare info must have been updated in txn by now. */
2685 
2686  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2687  txn->commit_time, txn->origin_id, txn->origin_lsn);
2688 
2689  /*
2690  * We send the prepare for the concurrently aborted xacts so that later
2691  * when rollback prepared is decoded and sent, the downstream should be
2692  * able to rollback such a xact. See comments atop DecodePrepare.
2693  */
2694  if (txn->concurrent_abort)
2695  rb->prepare(rb, txn, txn->final_lsn);
2696 }
TimestampTz commit_time
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
RepOriginId origin_id
char * pstrdup(const char *in)
Definition: mcxt.c:1299
XLogRecPtr origin_lsn
XLogRecPtr final_lsn
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn
#define RBTXN_PREPARE
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
ReorderBufferPrepareCB prepare

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer ,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2980 of file reorderbuffer.c.

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by DecodeHeap2Op(), DecodeHeapOp(), DecodeLogicalMsgOp(), DecodeStandbyOp(), DecodeXactOp(), DecodeXLogOp(), and LogicalDecodingProcessRecord().

2981 {
2982  /* many records won't have an xid assigned, centralize check here */
2983  if (xid != InvalidTransactionId)
2984  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2985 }
#define InvalidTransactionId
Definition: transam.h:31
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
ReorderBufferChange ,
bool  toast_insert 
)

Definition at line 754 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, ReorderBufferTXN::concurrent_abort, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBufferChangeMemoryUpdate(), ReorderBufferCheckMemoryLimit(), ReorderBufferProcessPartialChange(), ReorderBufferReturnChange(), ReorderBufferTXNByXid(), ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddInvalidations(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

756 {
757  ReorderBufferTXN *txn;
758 
759  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
760 
761  /*
762  * While streaming the previous changes we have detected that the
763  * transaction is aborted. So there is no point in collecting further
764  * changes for it.
765  */
766  if (txn->concurrent_abort)
767  {
768  /*
769  * We don't need to update memory accounting for this change as we
770  * have not added it to the queue yet.
771  */
772  ReorderBufferReturnChange(rb, change, false);
773  return;
774  }
775 
776  change->lsn = lsn;
777  change->txn = txn;
778 
779  Assert(InvalidXLogRecPtr != lsn);
780  dlist_push_tail(&txn->changes, &change->node);
781  txn->nentries++;
782  txn->nentries_mem++;
783 
784  /* update memory accounting information */
785  ReorderBufferChangeMemoryUpdate(rb, change, true);
786 
787  /* process partial change */
788  ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
789 
790  /* check the memory limits and evict something if needed */
792 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
dlist_head changes
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
#define Assert(condition)
Definition: c.h:804
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer ,
TransactionId  ,
Snapshot  snapshot,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 799 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), TeardownHistoricSnapshot(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeLogicalMsgOp().

803 {
804  if (transactional)
805  {
806  MemoryContext oldcontext;
807  ReorderBufferChange *change;
808 
810 
811  oldcontext = MemoryContextSwitchTo(rb->context);
812 
813  change = ReorderBufferGetChange(rb);
815  change->data.msg.prefix = pstrdup(prefix);
816  change->data.msg.message_size = message_size;
817  change->data.msg.message = palloc(message_size);
818  memcpy(change->data.msg.message, message, message_size);
819 
820  ReorderBufferQueueChange(rb, xid, lsn, change, false);
821 
822  MemoryContextSwitchTo(oldcontext);
823  }
824  else
825  {
826  ReorderBufferTXN *txn = NULL;
827  volatile Snapshot snapshot_now = snapshot;
828 
829  if (xid != InvalidTransactionId)
830  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
831 
832  /* setup snapshot to allow catalog access */
833  SetupHistoricSnapshot(snapshot_now, NULL);
834  PG_TRY();
835  {
836  rb->message(rb, txn, lsn, false, prefix, message_size, message);
837 
839  }
840  PG_CATCH();
841  {
843  PG_RE_THROW();
844  }
845  PG_END_TRY();
846  }
847 }
char * pstrdup(const char *in)
Definition: mcxt.c:1299
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2051
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
#define InvalidTransactionId
Definition: transam.h:31
union ReorderBufferChange::@97 data
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
#define PG_CATCH()
Definition: elog.h:323
#define Assert(condition)
Definition: c.h:804
struct ReorderBufferChange::@97::@100 msg
#define PG_RE_THROW()
Definition: elog.h:354
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:1062
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2035
#define PG_TRY()
Definition: elog.h:313
#define PG_END_TRY()
Definition: elog.h:338

◆ ReorderBufferRememberPrepareInfo()

bool ReorderBufferRememberPrepareInfo ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  prepare_lsn,
XLogRecPtr  end_lsn,
TimestampTz  prepare_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2621 of file reorderbuffer.c.

References ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodePrepare().

2625 {
2626  ReorderBufferTXN *txn;
2627 
2628  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2629 
2630  /* unknown transaction, nothing to do */
2631  if (txn == NULL)
2632  return false;
2633 
2634  /*
2635  * Remember the prepare information to be later used by commit prepared in
2636  * case we skip doing prepare.
2637  */
2638  txn->final_lsn = prepare_lsn;
2639  txn->end_lsn = end_lsn;
2640  txn->commit_time = prepare_time;
2641  txn->origin_id = origin_id;
2642  txn->origin_lsn = origin_lsn;
2643 
2644  return true;
2645 }
TimestampTz commit_time
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
RepOriginId origin_id
XLogRecPtr origin_lsn
XLogRecPtr final_lsn
XLogRecPtr end_lsn
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferReturnChange()

void ReorderBufferReturnChange ( ReorderBuffer ,
ReorderBufferChange ,
bool   
)

Definition at line 468 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::msg, pfree(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferFreeSnap(), ReorderBufferReturnRelids(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, ReorderBufferChange::tp, and ReorderBufferChange::truncate.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferProcessTXN(), ReorderBufferQueueChange(), ReorderBufferResetTXN(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferToastReset(), and ReorderBufferTruncateTXN().

470 {
471  /* update memory accounting info */
472  if (upd_mem)
473  ReorderBufferChangeMemoryUpdate(rb, change, false);
474 
475  /* free contained data */
476  switch (change->action)
477  {
482  if (change->data.tp.newtuple)
483  {
484  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
485  change->data.tp.newtuple = NULL;
486  }
487 
488  if (change->data.tp.oldtuple)
489  {
490  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
491  change->data.tp.oldtuple = NULL;
492  }
493  break;
495  if (change->data.msg.prefix != NULL)
496  pfree(change->data.msg.prefix);
497  change->data.msg.prefix = NULL;
498  if (change->data.msg.message != NULL)
499  pfree(change->data.msg.message);
500  change->data.msg.message = NULL;
501  break;
503  if (change->data.inval.invalidations)
504  pfree(change->data.inval.invalidations);
505  change->data.inval.invalidations = NULL;
506  break;
508  if (change->data.snapshot)
509  {
510  ReorderBufferFreeSnap(rb, change->data.snapshot);
511  change->data.snapshot = NULL;
512  }
513  break;
514  /* no data in addition to the struct itself */
516  if (change->data.truncate.relids != NULL)
517  {
518  ReorderBufferReturnRelids(rb, change->data.truncate.relids);
519  change->data.truncate.relids = NULL;
520  }
521  break;
525  break;
526  }
527 
528  pfree(change);
529 }
void pfree(void *pointer)
Definition: mcxt.c:1169
void ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)

◆ ReorderBufferReturnRelids()

void ReorderBufferReturnRelids ( ReorderBuffer ,
Oid relids 
)

Definition at line 588 of file reorderbuffer.c.

References pfree().

Referenced by ReorderBufferReturnChange().

589 {
590  pfree(relids);
591 }
void pfree(void *pointer)
Definition: mcxt.c:1169

◆ ReorderBufferReturnTupleBuf()

void ReorderBufferReturnTupleBuf ( ReorderBuffer ,
ReorderBufferTupleBuf tuple 
)

Definition at line 557 of file reorderbuffer.c.

References pfree().

Referenced by ReorderBufferReturnChange().

558 {
559  pfree(tuple);
560 }
void pfree(void *pointer)
Definition: mcxt.c:1169

◆ ReorderBufferSetBaseSnapshot()

void ReorderBufferSetBaseSnapshot ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
struct SnapshotData snap 
)

◆ ReorderBufferSetRestartPoint()

void ReorderBufferSetRestartPoint ( ReorderBuffer ,
XLogRecPtr  ptr 
)

Definition at line 989 of file reorderbuffer.c.

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

990 {
991  rb->current_restart_decoding_lsn = ptr;
992 }

◆ ReorderBufferSkipPrepare()

void ReorderBufferSkipPrepare ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 2649 of file reorderbuffer.c.

References InvalidXLogRecPtr, RBTXN_SKIPPED_PREPARE, ReorderBufferTXNByXid(), ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::txn_flags.

Referenced by DecodePrepare().

2650 {
2651  ReorderBufferTXN *txn;
2652 
2653  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2654 
2655  /* unknown transaction, nothing to do */
2656  if (txn == NULL)
2657  return;
2658 
2660 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define RBTXN_SKIPPED_PREPARE
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferXidHasBaseSnapshot()

bool ReorderBufferXidHasBaseSnapshot ( ReorderBuffer ,
TransactionId  xid 
)

Definition at line 3277 of file reorderbuffer.c.

References ReorderBufferTXN::base_snapshot, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), ReorderBufferTXN::toplevel_xid, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildCommitTxn(), SnapBuildDistributeNewCatalogSnapshot(), and SnapBuildProcessChange().

3278 {
3279  ReorderBufferTXN *txn;
3280 
3281  txn = ReorderBufferTXNByXid(rb, xid, false,
3282  NULL, InvalidXLogRecPtr, false);
3283 
3284  /* transaction isn't known yet, ergo no snapshot */
3285  if (txn == NULL)
3286  return false;
3287 
3288  /* a known subtxn? operate on top-level txn instead */
3289  if (rbtxn_is_known_subxact(txn))
3290  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3291  NULL, InvalidXLogRecPtr, false);
3292 
3293  return txn->base_snapshot != NULL;
3294 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_known_subxact(txn)
TransactionId toplevel_xid

◆ ReorderBufferXidHasCatalogChanges()

bool ReorderBufferXidHasCatalogChanges ( ReorderBuffer ,
TransactionId  xid 
)

Definition at line 3260 of file reorderbuffer.c.

References InvalidXLogRecPtr, rbtxn_has_catalog_changes, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildCommitTxn().

3261 {
3262  ReorderBufferTXN *txn;
3263 
3264  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3265  false);
3266  if (txn == NULL)
3267  return false;
3268 
3269  return rbtxn_has_catalog_changes(txn);
3270 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define rbtxn_has_catalog_changes(txn)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferXidSetCatalogChanges()

void ReorderBufferXidSetCatalogChanges ( ReorderBuffer ,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3236 of file reorderbuffer.c.

References RBTXN_HAS_CATALOG_CHANGES, ReorderBufferTXNByXid(), ReorderBufferTXN::toptxn, ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::txn_flags.

Referenced by DecodeHeapOp(), DecodeXactOp(), and SnapBuildProcessNewCid().

3238 {
3239  ReorderBufferTXN *txn;
3240 
3241  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3242 
3244 
3245  /*
3246  * Mark top-level transaction as having catalog changes too if one of its
3247  * children has so that the ReorderBufferBuildTupleCidHash can
3248  * conveniently check just top-level transaction and decide whether to
3249  * build the hash table or not.
3250  */
3251  if (txn->toptxn != NULL)
3253 }
#define RBTXN_HAS_CATALOG_CHANGES
struct ReorderBufferTXN * toptxn
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ StartupReorderBuffer()

void StartupReorderBuffer ( void  )

Definition at line 4406 of file reorderbuffer.c.

References AllocateDir(), dirent::d_name, DEBUG2, FreeDir(), ReadDir(), ReorderBufferCleanupSerializedTXNs(), and ReplicationSlotValidateName().

Referenced by StartupXLOG().

4407 {
4408  DIR *logical_dir;
4409  struct dirent *logical_de;
4410 
4411  logical_dir = AllocateDir("pg_replslot");
4412  while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
4413  {
4414  if (strcmp(logical_de->d_name, ".") == 0 ||
4415  strcmp(logical_de->d_name, "..") == 0)
4416  continue;
4417 
4418  /* if it cannot be a slot, skip the directory */
4419  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
4420  continue;
4421 
4422  /*
4423  * ok, has to be a surviving logical slot, iterate and delete
4424  * everything starting with xid-*
4425  */
4427  }
4428  FreeDir(logical_dir);
4429 }
Definition: dirent.h:9
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:174
Definition: dirent.c:25
#define DEBUG2
Definition: elog.h:24
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2634
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2700
char d_name[MAX_PATH]
Definition: dirent.h:15
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
int FreeDir(DIR *dir)
Definition: fd.c:2752

Variable Documentation

◆ logical_decoding_work_mem

PGDLLIMPORT int logical_decoding_work_mem

Definition at line 208 of file reorderbuffer.c.

Referenced by ReorderBufferCheckMemoryLimit().