PostgreSQL Source Code  git master
reorderbuffer.h File Reference
#include "access/htup_details.h"
#include "lib/ilist.h"
#include "storage/sinval.h"
#include "utils/hsearch.h"
#include "utils/relcache.h"
#include "utils/snapshot.h"
#include "utils/timestamp.h"
Include dependency graph for reorderbuffer.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTupleBuf
 
struct  ReorderBufferChange
 
struct  ReorderBufferTXN
 
struct  ReorderBuffer
 

Macros

#define ReorderBufferTupleBufData(p)   ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))
 
#define RBTXN_HAS_CATALOG_CHANGES   0x0001
 
#define RBTXN_IS_SUBXACT   0x0002
 
#define RBTXN_IS_SERIALIZED   0x0004
 
#define RBTXN_IS_SERIALIZED_CLEAR   0x0008
 
#define RBTXN_IS_STREAMED   0x0010
 
#define RBTXN_HAS_TOAST_INSERT   0x0020
 
#define RBTXN_HAS_SPEC_INSERT   0x0040
 
#define rbtxn_has_catalog_changes(txn)
 
#define rbtxn_is_known_subxact(txn)
 
#define rbtxn_is_serialized(txn)
 
#define rbtxn_is_serialized_clear(txn)
 
#define rbtxn_has_toast_insert(txn)
 
#define rbtxn_has_spec_insert(txn)
 
#define rbtxn_has_incomplete_tuple(txn)
 
#define rbtxn_is_streamed(txn)
 

Typedefs

typedef struct ReorderBufferTupleBuf ReorderBufferTupleBuf
 
typedef struct ReorderBufferChange ReorderBufferChange
 
typedef struct ReorderBufferTXN ReorderBufferTXN
 
typedef struct ReorderBuffer ReorderBuffer
 
typedef void(* ReorderBufferApplyChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
typedef void(* ReorderBufferApplyTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
typedef void(* ReorderBufferBeginCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
typedef void(* ReorderBufferCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
typedef void(* ReorderBufferMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
typedef void(* ReorderBufferStreamStartCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr first_lsn)
 
typedef void(* ReorderBufferStreamStopCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr last_lsn)
 
typedef void(* ReorderBufferStreamAbortCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
 
typedef void(* ReorderBufferStreamCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
typedef void(* ReorderBufferStreamChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
typedef void(* ReorderBufferStreamMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
typedef void(* ReorderBufferStreamTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 

Enumerations

enum  ReorderBufferChangeType {
  REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_MESSAGE,
  REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
  REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_TRUNCATE
}
 

Functions

ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *)
 
ReorderBufferTupleBufReorderBufferGetTupleBuf (ReorderBuffer *, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (ReorderBuffer *, ReorderBufferTupleBuf *tuple)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *)
 
void ReorderBufferReturnChange (ReorderBuffer *, ReorderBufferChange *, bool)
 
OidReorderBufferGetRelids (ReorderBuffer *, int nrelids)
 
void ReorderBufferReturnRelids (ReorderBuffer *, Oid *relids)
 
void ReorderBufferQueueChange (ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *, bool toast_insert)
 
void ReorderBufferQueueMessage (ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
void ReorderBufferCommit (ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferAssignChild (ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
void ReorderBufferAbort (ReorderBuffer *, TransactionId, XLogRecPtr lsn)
 
void ReorderBufferAbortOld (ReorderBuffer *, TransactionId xid)
 
void ReorderBufferForget (ReorderBuffer *, TransactionId, XLogRecPtr lsn)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap)
 
void ReorderBufferAddSnapshot (ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *, TransactionId, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *, TransactionId, XLogRecPtr lsn, RelFileNode node, ItemPointerData pt, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *, TransactionId, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *, TransactionId xid, XLogRecPtr lsn)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *, TransactionId xid)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *)
 
TransactionId ReorderBufferGetOldestXmin (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *, XLogRecPtr ptr)
 
void StartupReorderBuffer (void)
 

Variables

PGDLLIMPORT int logical_decoding_work_mem
 

Macro Definition Documentation

◆ RBTXN_HAS_CATALOG_CHANGES

#define RBTXN_HAS_CATALOG_CHANGES   0x0001

Definition at line 170 of file reorderbuffer.h.

Referenced by ReorderBufferXidSetCatalogChanges().

◆ rbtxn_has_catalog_changes

#define rbtxn_has_catalog_changes (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_HAS_CATALOG_CHANGES) != 0 \
)
#define RBTXN_HAS_CATALOG_CHANGES

Definition at line 179 of file reorderbuffer.h.

Referenced by ReorderBufferBuildTupleCidHash(), and ReorderBufferXidHasCatalogChanges().

◆ rbtxn_has_incomplete_tuple

#define rbtxn_has_incomplete_tuple (   txn)
Value:
( \
rbtxn_has_toast_insert(txn) || rbtxn_has_spec_insert(txn) \
)
#define rbtxn_has_spec_insert(txn)

Definition at line 217 of file reorderbuffer.h.

Referenced by ReorderBufferLargestTopTXN(), and ReorderBufferProcessPartialChange().

◆ RBTXN_HAS_SPEC_INSERT

#define RBTXN_HAS_SPEC_INSERT   0x0040

Definition at line 176 of file reorderbuffer.h.

Referenced by ReorderBufferProcessPartialChange().

◆ rbtxn_has_spec_insert

#define rbtxn_has_spec_insert (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_HAS_SPEC_INSERT) != 0 \
)
#define RBTXN_HAS_SPEC_INSERT

Definition at line 211 of file reorderbuffer.h.

Referenced by ReorderBufferProcessPartialChange().

◆ RBTXN_HAS_TOAST_INSERT

#define RBTXN_HAS_TOAST_INSERT   0x0020

Definition at line 175 of file reorderbuffer.h.

Referenced by ReorderBufferProcessPartialChange().

◆ rbtxn_has_toast_insert

#define rbtxn_has_toast_insert (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_HAS_TOAST_INSERT) != 0 \
)
#define RBTXN_HAS_TOAST_INSERT

Definition at line 203 of file reorderbuffer.h.

Referenced by ReorderBufferProcessPartialChange().

◆ rbtxn_is_known_subxact

#define rbtxn_is_known_subxact (   txn)

◆ RBTXN_IS_SERIALIZED

#define RBTXN_IS_SERIALIZED   0x0004

Definition at line 172 of file reorderbuffer.h.

Referenced by ReorderBufferSerializeTXN(), and ReorderBufferTruncateTXN().

◆ rbtxn_is_serialized

#define rbtxn_is_serialized (   txn)

◆ RBTXN_IS_SERIALIZED_CLEAR

#define RBTXN_IS_SERIALIZED_CLEAR   0x0008

Definition at line 173 of file reorderbuffer.h.

Referenced by ReorderBufferTruncateTXN().

◆ rbtxn_is_serialized_clear

#define rbtxn_is_serialized_clear (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_IS_SERIALIZED_CLEAR) != 0 \
)
#define RBTXN_IS_SERIALIZED_CLEAR

Definition at line 197 of file reorderbuffer.h.

Referenced by ReorderBufferSerializeTXN().

◆ RBTXN_IS_STREAMED

#define RBTXN_IS_STREAMED   0x0010

Definition at line 174 of file reorderbuffer.h.

Referenced by ReorderBufferTruncateTXN().

◆ rbtxn_is_streamed

◆ RBTXN_IS_SUBXACT

#define RBTXN_IS_SUBXACT   0x0002

Definition at line 171 of file reorderbuffer.h.

Referenced by ReorderBufferAssignChild().

◆ ReorderBufferTupleBufData

#define ReorderBufferTupleBufData (   p)    ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))

Typedef Documentation

◆ ReorderBuffer

typedef struct ReorderBuffer ReorderBuffer

Definition at line 389 of file reorderbuffer.h.

◆ ReorderBufferApplyChangeCB

typedef void(* ReorderBufferApplyChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)

Definition at line 392 of file reorderbuffer.h.

◆ ReorderBufferApplyTruncateCB

typedef void(* ReorderBufferApplyTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)

Definition at line 398 of file reorderbuffer.h.

◆ ReorderBufferBeginCB

typedef void(* ReorderBufferBeginCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)

Definition at line 405 of file reorderbuffer.h.

◆ ReorderBufferChange

◆ ReorderBufferCommitCB

typedef void(* ReorderBufferCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

Definition at line 409 of file reorderbuffer.h.

◆ ReorderBufferMessageCB

typedef void(* ReorderBufferMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)

Definition at line 414 of file reorderbuffer.h.

◆ ReorderBufferStreamAbortCB

typedef void(* ReorderBufferStreamAbortCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)

Definition at line 434 of file reorderbuffer.h.

◆ ReorderBufferStreamChangeCB

typedef void(* ReorderBufferStreamChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)

Definition at line 446 of file reorderbuffer.h.

◆ ReorderBufferStreamCommitCB

typedef void(* ReorderBufferStreamCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

Definition at line 440 of file reorderbuffer.h.

◆ ReorderBufferStreamMessageCB

typedef void(* ReorderBufferStreamMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)

Definition at line 453 of file reorderbuffer.h.

◆ ReorderBufferStreamStartCB

typedef void(* ReorderBufferStreamStartCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr first_lsn)

Definition at line 422 of file reorderbuffer.h.

◆ ReorderBufferStreamStopCB

typedef void(* ReorderBufferStreamStopCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr last_lsn)

Definition at line 428 of file reorderbuffer.h.

◆ ReorderBufferStreamTruncateCB

typedef void(* ReorderBufferStreamTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)

Definition at line 462 of file reorderbuffer.h.

◆ ReorderBufferTupleBuf

◆ ReorderBufferTXN

Enumeration Type Documentation

◆ ReorderBufferChangeType

Enumerator
REORDER_BUFFER_CHANGE_INSERT 
REORDER_BUFFER_CHANGE_UPDATE 
REORDER_BUFFER_CHANGE_DELETE 
REORDER_BUFFER_CHANGE_MESSAGE 
REORDER_BUFFER_CHANGE_INVALIDATION 
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT 
REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID 
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM 
REORDER_BUFFER_CHANGE_TRUNCATE 

Definition at line 54 of file reorderbuffer.h.

Function Documentation

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn 
)

Definition at line 2491 of file reorderbuffer.c.

References ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), ReorderBuffer::stream_abort, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeAbort().

2492 {
2493  ReorderBufferTXN *txn;
2494 
2495  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2496  false);
2497 
2498  /* unknown, nothing to remove */
2499  if (txn == NULL)
2500  return;
2501 
2502  /* For streamed transactions notify the remote node about the abort. */
2503  if (rbtxn_is_streamed(txn))
2504  {
2505  rb->stream_abort(rb, txn, lsn);
2506 
2507  /*
2508  * We might have decoded changes for this transaction that could load
2509  * the cache as per the current transaction's view (consider DDL's
2510  * happened in this transaction). We don't want the decoding of future
2511  * transactions to use those cache entries so execute invalidations.
2512  */
2513  if (txn->ninvalidations > 0)
2515  txn->invalidations);
2516  }
2517 
2518  /* cosmetic... */
2519  txn->final_lsn = lsn;
2520 
2521  /* remove potential on-disk data, and deallocate */
2522  ReorderBufferCleanupTXN(rb, txn);
2523 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define rbtxn_is_streamed(txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer ,
TransactionId  xid 
)

Definition at line 2533 of file reorderbuffer.c.

References dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, elog, ReorderBufferCleanupTXN(), ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::xid.

Referenced by DecodeStandbyOp().

2534 {
2535  dlist_mutable_iter it;
2536 
2537  /*
2538  * Iterate through all (potential) toplevel TXNs and abort all that are
2539  * older than what possibly can be running. Once we've found the first
2540  * that is alive we stop, there might be some that acquired an xid earlier
2541  * but started writing later, but it's unlikely and they will be cleaned
2542  * up in a later call to this function.
2543  */
2544  dlist_foreach_modify(it, &rb->toplevel_by_lsn)
2545  {
2546  ReorderBufferTXN *txn;
2547 
2548  txn = dlist_container(ReorderBufferTXN, node, it.cur);
2549 
2550  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2551  {
2552  elog(DEBUG2, "aborting old transaction %u", txn->xid);
2553 
2554  /* remove potential on-disk data, and deallocate this tx */
2555  ReorderBufferCleanupTXN(rb, txn);
2556  }
2557  else
2558  return;
2559  }
2560 }
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define DEBUG2
Definition: elog.h:24
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
#define elog(elevel,...)
Definition: elog.h:228

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 2837 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferTXN::invalidations, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, palloc(), REORDER_BUFFER_CHANGE_INVALIDATION, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), repalloc(), ReorderBufferTXN::toptxn, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeXactOp().

2840 {
2841  ReorderBufferTXN *txn;
2842  MemoryContext oldcontext;
2843  ReorderBufferChange *change;
2844 
2845  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2846 
2847  oldcontext = MemoryContextSwitchTo(rb->context);
2848 
2849  /*
2850  * Collect all the invalidations under the top transaction so that we can
2851  * execute them all together. See comment atop this function
2852  */
2853  if (txn->toptxn)
2854  txn = txn->toptxn;
2855 
2856  Assert(nmsgs > 0);
2857 
2858  /* Accumulate invalidations. */
2859  if (txn->ninvalidations == 0)
2860  {
2861  txn->ninvalidations = nmsgs;
2863  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
2864  memcpy(txn->invalidations, msgs,
2865  sizeof(SharedInvalidationMessage) * nmsgs);
2866  }
2867  else
2868  {
2871  (txn->ninvalidations + nmsgs));
2872 
2873  memcpy(txn->invalidations + txn->ninvalidations, msgs,
2874  nmsgs * sizeof(SharedInvalidationMessage));
2875  txn->ninvalidations += nmsgs;
2876  }
2877 
2878  change = ReorderBufferGetChange(rb);
2880  change->data.inval.ninvalidations = nmsgs;
2881  change->data.inval.invalidations = (SharedInvalidationMessage *)
2882  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
2883  memcpy(change->data.inval.invalidations, msgs,
2884  sizeof(SharedInvalidationMessage) * nmsgs);
2885 
2886  ReorderBufferQueueChange(rb, xid, lsn, change, false);
2887 
2888  MemoryContextSwitchTo(oldcontext);
2889 }
union ReorderBufferChange::@98 data
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
struct ReorderBufferChange::@98::@103 inval
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
struct ReorderBufferTXN * toptxn
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
#define Assert(condition)
Definition: c.h:800
SharedInvalidationMessage * invalidations
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1070
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:950

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
CommandId  cid 
)

Definition at line 2714 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::command_id, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, ReorderBufferGetChange(), and ReorderBufferQueueChange().

Referenced by SnapBuildProcessNewCid().

2716 {
2718 
2719  change->data.command_id = cid;
2721 
2722  ReorderBufferQueueChange(rb, xid, lsn, change, false);
2723 }
union ReorderBufferChange::@98 data
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
RelFileNode  node,
ItemPointerData  pt,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 2801 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessNewCid().

2805 {
2807  ReorderBufferTXN *txn;
2808 
2809  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2810 
2811  change->data.tuplecid.node = node;
2812  change->data.tuplecid.tid = tid;
2813  change->data.tuplecid.cmin = cmin;
2814  change->data.tuplecid.cmax = cmax;
2815  change->data.tuplecid.combocid = combocid;
2816  change->lsn = lsn;
2817  change->txn = txn;
2819 
2820  dlist_push_tail(&txn->tuplecids, &change->node);
2821  txn->ntuplecids++;
2822 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:87
union ReorderBufferChange::@98 data
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
struct ReorderBufferChange::@98::@102 tuplecid
dlist_head tuplecids

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
struct SnapshotData snap 
)

◆ ReorderBufferAllocate()

ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 298 of file reorderbuffer.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, ReorderBufferCleanupSerializedTXNs(), ReorderBuffer::size, SLAB_DEFAULT_BLOCK_SIZE, SLAB_LARGE_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBuffer::toplevel_by_lsn, ReorderBuffer::tup_context, ReorderBuffer::txn_context, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

299 {
300  ReorderBuffer *buffer;
301  HASHCTL hash_ctl;
302  MemoryContext new_ctx;
303 
304  Assert(MyReplicationSlot != NULL);
305 
306  /* allocate memory in own context, to have better accountability */
308  "ReorderBuffer",
310 
311  buffer =
312  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
313 
314  memset(&hash_ctl, 0, sizeof(hash_ctl));
315 
316  buffer->context = new_ctx;
317 
318  buffer->change_context = SlabContextCreate(new_ctx,
319  "Change",
321  sizeof(ReorderBufferChange));
322 
323  buffer->txn_context = SlabContextCreate(new_ctx,
324  "TXN",
326  sizeof(ReorderBufferTXN));
327 
328  buffer->tup_context = GenerationContextCreate(new_ctx,
329  "Tuples",
331 
332  hash_ctl.keysize = sizeof(TransactionId);
333  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
334  hash_ctl.hcxt = buffer->context;
335 
336  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
338 
340  buffer->by_txn_last_txn = NULL;
341 
342  buffer->outbuf = NULL;
343  buffer->outbufsize = 0;
344  buffer->size = 0;
345 
346  buffer->spillTxns = 0;
347  buffer->spillCount = 0;
348  buffer->spillBytes = 0;
349  buffer->streamTxns = 0;
350  buffer->streamCount = 0;
351  buffer->streamBytes = 0;
352 
354 
355  dlist_init(&buffer->toplevel_by_lsn);
357 
358  /*
359  * Ensure there's no stale data from prior uses of this slot, in case some
360  * prior exit avoided calling ReorderBufferFree. Failure to do this can
361  * produce duplicated txns, and it's very cheap if there's nothing there.
362  */
364 
365  return buffer;
366 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define AllocSetContextCreate
Definition: memutils.h:170
#define HASH_CONTEXT
Definition: hsearch.h:91
#define HASH_ELEM
Definition: hsearch.h:85
uint32 TransactionId
Definition: c.h:575
MemoryContext hcxt
Definition: hsearch.h:77
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
Size entrysize
Definition: hsearch.h:72
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:174
ReplicationSlotPersistentData data
Definition: slot.h:143
MemoryContext change_context
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:222
dlist_head txns_by_base_snapshot_lsn
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
#define InvalidTransactionId
Definition: transam.h:31
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define HASH_BLOBS
Definition: hsearch.h:86
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size blockSize)
Definition: generation.c:196
MemoryContext context
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:326
ReorderBufferTXN * by_txn_last_txn
dlist_head toplevel_by_lsn
Size keysize
Definition: hsearch.h:71
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:800
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:221
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797
#define NameStr(name)
Definition: c.h:677
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
MemoryContext tup_context
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext txn_context

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer ,
TransactionId  ,
TransactionId  ,
XLogRecPtr  commit_lsn 
)

Definition at line 991 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SUBXACT, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, ReorderBufferTXN::toplevel_xid, ReorderBufferTXN::toptxn, ReorderBufferTXNByIdEnt::txn, ReorderBufferTXN::txn_flags, and ReorderBufferTXNByIdEnt::xid.

Referenced by LogicalDecodingProcessRecord(), and ReorderBufferCommitChild().

993 {
994  ReorderBufferTXN *txn;
995  ReorderBufferTXN *subtxn;
996  bool new_top;
997  bool new_sub;
998 
999  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1000  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1001 
1002  if (!new_sub)
1003  {
1004  if (rbtxn_is_known_subxact(subtxn))
1005  {
1006  /* already associated, nothing to do */
1007  return;
1008  }
1009  else
1010  {
1011  /*
1012  * We already saw this transaction, but initially added it to the
1013  * list of top-level txns. Now that we know it's not top-level,
1014  * remove it from there.
1015  */
1016  dlist_delete(&subtxn->node);
1017  }
1018  }
1019 
1020  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1021  subtxn->toplevel_xid = xid;
1022  Assert(subtxn->nsubtxns == 0);
1023 
1024  /* set the reference to top-level transaction */
1025  subtxn->toptxn = txn;
1026 
1027  /* add to subtransaction list */
1028  dlist_push_tail(&txn->subtxns, &subtxn->node);
1029  txn->nsubtxns++;
1030 
1031  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1032  ReorderBufferTransferSnapToParent(txn, subtxn);
1033 
1034  /* Verify LSN-ordering invariant */
1035  AssertTXNLsnOrder(rb);
1036 }
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
#define RBTXN_IS_SUBXACT
struct ReorderBufferTXN * toptxn
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:800
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_known_subxact(txn)
TransactionId toplevel_xid

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2422 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, FirstCommandId, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), ReorderBufferStreamCommit(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

2426 {
2427  ReorderBufferTXN *txn;
2428  Snapshot snapshot_now;
2429  CommandId command_id = FirstCommandId;
2430 
2431  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2432  false);
2433 
2434  /* unknown transaction, nothing to replay */
2435  if (txn == NULL)
2436  return;
2437 
2438  txn->final_lsn = commit_lsn;
2439  txn->end_lsn = end_lsn;
2440  txn->commit_time = commit_time;
2441  txn->origin_id = origin_id;
2442  txn->origin_lsn = origin_lsn;
2443 
2444  /*
2445  * If the transaction was (partially) streamed, we need to commit it in a
2446  * 'streamed' way. That is, we first stream the remaining part of the
2447  * transaction, and then invoke stream_commit message.
2448  *
2449  * Called after everything (origin ID, LSN, ...) is stored in the
2450  * transaction to avoid passing that information directly.
2451  */
2452  if (rbtxn_is_streamed(txn))
2453  {
2454  ReorderBufferStreamCommit(rb, txn);
2455  return;
2456  }
2457 
2458  /*
2459  * If this transaction has no snapshot, it didn't make any changes to the
2460  * database, so there's nothing to decode. Note that
2461  * ReorderBufferCommitChild will have transferred any snapshots from
2462  * subtransactions if there were any.
2463  */
2464  if (txn->base_snapshot == NULL)
2465  {
2466  Assert(txn->ninvalidations == 0);
2467  ReorderBufferCleanupTXN(rb, txn);
2468  return;
2469  }
2470 
2471  snapshot_now = txn->base_snapshot;
2472 
2473  /* Process and send the changes to output plugin. */
2474  ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2475  command_id, false);
2476 }
uint32 CommandId
Definition: c.h:589
TimestampTz commit_time
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
RepOriginId origin_id
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
XLogRecPtr origin_lsn
#define FirstCommandId
Definition: c.h:591
#define rbtxn_is_streamed(txn)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:800
XLogRecPtr end_lsn
static void ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer ,
TransactionId  ,
TransactionId  ,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 1111 of file reorderbuffer.c.

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

1114 {
1115  ReorderBufferTXN *subtxn;
1116 
1117  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1118  InvalidXLogRecPtr, false);
1119 
1120  /*
1121  * No need to do anything if that subtxn didn't contain any changes
1122  */
1123  if (!subtxn)
1124  return;
1125 
1126  subtxn->final_lsn = commit_lsn;
1127  subtxn->end_lsn = end_lsn;
1128 
1129  /*
1130  * Assign this subxact as a child of the toplevel xact (no-op if already
1131  * done.)
1132  */
1133  ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1134 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr final_lsn
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
XLogRecPtr end_lsn
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn 
)

Definition at line 2576 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), ReorderBuffer::stream_abort, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

2577 {
2578  ReorderBufferTXN *txn;
2579 
2580  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2581  false);
2582 
2583  /* unknown, nothing to forget */
2584  if (txn == NULL)
2585  return;
2586 
2587  /* For streamed transactions notify the remote node about the abort. */
2588  if (rbtxn_is_streamed(txn))
2589  rb->stream_abort(rb, txn, lsn);
2590 
2591  /* cosmetic... */
2592  txn->final_lsn = lsn;
2593 
2594  /*
2595  * Process cache invalidation messages if there are any. Even if we're not
2596  * interested in the transaction's contents, it could have manipulated the
2597  * catalog and we need to update the caches according to that.
2598  */
2599  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2601  txn->invalidations);
2602  else
2603  Assert(txn->ninvalidations == 0);
2604 
2605  /* remove potential on-disk data, and deallocate */
2606  ReorderBufferCleanupTXN(rb, txn);
2607 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
#define rbtxn_is_streamed(txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:800
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer )

Definition at line 372 of file reorderbuffer.c.

References ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

373 {
374  MemoryContext context = rb->context;
375 
376  /*
377  * We free separately allocated data by entirely scrapping reorderbuffer's
378  * memory context.
379  */
380  MemoryContextDelete(context);
381 
382  /* Free disk space used by unconsumed reorder buffers */
384 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:212
ReplicationSlotPersistentData data
Definition: slot.h:143
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NameStr(name)
Definition: c.h:677
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)

◆ ReorderBufferGetChange()

ReorderBufferChange* ReorderBufferGetChange ( ReorderBuffer )

Definition at line 444 of file reorderbuffer.c.

References ReorderBuffer::change_context, and MemoryContextAlloc().

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddInvalidations(), ReorderBufferAddNewCommandId(), ReorderBufferAddNewTupleCids(), ReorderBufferAddSnapshot(), ReorderBufferQueueMessage(), and ReorderBufferRestoreChange().

445 {
446  ReorderBufferChange *change;
447 
448  change = (ReorderBufferChange *)
449  MemoryContextAlloc(rb->change_context, sizeof(ReorderBufferChange));
450 
451  memset(change, 0, sizeof(ReorderBufferChange));
452  return change;
453 }
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer )

Definition at line 936 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessRunningXacts().

937 {
938  ReorderBufferTXN *txn;
939 
940  AssertTXNLsnOrder(rb);
941 
942  if (dlist_is_empty(&rb->toplevel_by_lsn))
943  return NULL;
944 
945  txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
946 
949  return txn;
950 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:800
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define rbtxn_is_known_subxact(txn)

◆ ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBuffer rb)

Definition at line 964 of file reorderbuffer.c.

References AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, dlist_head_element, dlist_is_empty(), InvalidTransactionId, ReorderBufferTXNByIdEnt::txn, ReorderBuffer::txns_by_base_snapshot_lsn, and SnapshotData::xmin.

Referenced by SnapBuildProcessRunningXacts().

965 {
966  ReorderBufferTXN *txn;
967 
968  AssertTXNLsnOrder(rb);
969 
971  return InvalidTransactionId;
972 
973  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
975  return txn->base_snapshot->xmin;
976 }
Snapshot base_snapshot
dlist_head txns_by_base_snapshot_lsn
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: snapshot.h:157
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289

◆ ReorderBufferGetRelids()

Oid* ReorderBufferGetRelids ( ReorderBuffer ,
int  nrelids 
)

Definition at line 563 of file reorderbuffer.c.

References ReorderBuffer::context, and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

564 {
565  Oid *relids;
566  Size alloc_len;
567 
568  alloc_len = sizeof(Oid) * nrelids;
569 
570  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
571 
572  return relids;
573 }
unsigned int Oid
Definition: postgres_ext.h:31
size_t Size
Definition: c.h:528
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797

◆ ReorderBufferGetTupleBuf()

ReorderBufferTupleBuf* ReorderBufferGetTupleBuf ( ReorderBuffer ,
Size  tuple_len 
)

Definition at line 527 of file reorderbuffer.c.

References ReorderBufferTupleBuf::alloc_tuple_size, MemoryContextAlloc(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, HeapTupleData::t_data, ReorderBuffer::tup_context, and ReorderBufferTupleBuf::tuple.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

528 {
529  ReorderBufferTupleBuf *tuple;
530  Size alloc_len;
531 
532  alloc_len = tuple_len + SizeofHeapTupleHeader;
533 
534  tuple = (ReorderBufferTupleBuf *)
535  MemoryContextAlloc(rb->tup_context,
536  sizeof(ReorderBufferTupleBuf) +
537  MAXIMUM_ALIGNOF + alloc_len);
538  tuple->alloc_tuple_size = alloc_len;
539  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
540 
541  return tuple;
542 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
HeapTupleHeader t_data
Definition: htup.h:68
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:38
HeapTupleData tuple
Definition: reorderbuffer.h:29
size_t Size
Definition: c.h:528
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer ,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 2616 of file reorderbuffer.c.

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by DecodeXactOp(), ReorderBufferAbort(), and ReorderBufferForget().

2618 {
2619  bool use_subtxn = IsTransactionOrTransactionBlock();
2620  int i;
2621 
2622  if (use_subtxn)
2623  BeginInternalSubTransaction("replay");
2624 
2625  /*
2626  * Force invalidations to happen outside of a valid transaction - that way
2627  * entries will just be marked as invalid without accessing the catalog.
2628  * That's advantageous because we don't need to setup the full state
2629  * necessary for catalog access.
2630  */
2631  if (use_subtxn)
2633 
2634  for (i = 0; i < ninvalidations; i++)
2635  LocalExecuteInvalidationMessage(&invalidations[i]);
2636 
2637  if (use_subtxn)
2639 }
void AbortCurrentTransaction(void)
Definition: xact.c:3212
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4703
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4514
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4409
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:559
int i

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer ,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2652 of file reorderbuffer.c.

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by DecodeHeap2Op(), DecodeHeapOp(), DecodeLogicalMsgOp(), DecodeStandbyOp(), DecodeXactOp(), DecodeXLogOp(), and LogicalDecodingProcessRecord().

2653 {
2654  /* many records won't have an xid assigned, centralize check here */
2655  if (xid != InvalidTransactionId)
2656  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2657 }
#define InvalidTransactionId
Definition: transam.h:31
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
ReorderBufferChange ,
bool  toast_insert 
)

Definition at line 745 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, ReorderBufferTXN::concurrent_abort, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBufferChangeMemoryUpdate(), ReorderBufferCheckMemoryLimit(), ReorderBufferProcessPartialChange(), ReorderBufferReturnChange(), ReorderBufferTXNByXid(), ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddInvalidations(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

747 {
748  ReorderBufferTXN *txn;
749 
750  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
751 
752  /*
753  * While streaming the previous changes we have detected that the
754  * transaction is aborted. So there is no point in collecting further
755  * changes for it.
756  */
757  if (txn->concurrent_abort)
758  {
759  /*
760  * We don't need to update memory accounting for this change as we
761  * have not added it to the queue yet.
762  */
763  ReorderBufferReturnChange(rb, change, false);
764  return;
765  }
766 
767  change->lsn = lsn;
768  change->txn = txn;
769 
770  Assert(InvalidXLogRecPtr != lsn);
771  dlist_push_tail(&txn->changes, &change->node);
772  txn->nentries++;
773  txn->nentries_mem++;
774 
775  /* update memory accounting information */
776  ReorderBufferChangeMemoryUpdate(rb, change, true);
777 
778  /* process partial change */
779  ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
780 
781  /* check the memory limits and evict something if needed */
783 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
dlist_head changes
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
#define Assert(condition)
Definition: c.h:800
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer ,
TransactionId  ,
Snapshot  snapshot,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 789 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), TeardownHistoricSnapshot(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeLogicalMsgOp().

793 {
794  if (transactional)
795  {
796  MemoryContext oldcontext;
797  ReorderBufferChange *change;
798 
800 
801  oldcontext = MemoryContextSwitchTo(rb->context);
802 
803  change = ReorderBufferGetChange(rb);
805  change->data.msg.prefix = pstrdup(prefix);
806  change->data.msg.message_size = message_size;
807  change->data.msg.message = palloc(message_size);
808  memcpy(change->data.msg.message, message, message_size);
809 
810  ReorderBufferQueueChange(rb, xid, lsn, change, false);
811 
812  MemoryContextSwitchTo(oldcontext);
813  }
814  else
815  {
816  ReorderBufferTXN *txn = NULL;
817  volatile Snapshot snapshot_now = snapshot;
818 
819  if (xid != InvalidTransactionId)
820  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
821 
822  /* setup snapshot to allow catalog access */
823  SetupHistoricSnapshot(snapshot_now, NULL);
824  PG_TRY();
825  {
826  rb->message(rb, txn, lsn, false, prefix, message_size, message);
827 
829  }
830  PG_CATCH();
831  {
833  PG_RE_THROW();
834  }
835  PG_END_TRY();
836  }
837 }
char * pstrdup(const char *in)
Definition: mcxt.c:1187
union ReorderBufferChange::@98 data
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2047
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
#define InvalidTransactionId
Definition: transam.h:31
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
#define PG_CATCH()
Definition: elog.h:319
#define Assert(condition)
Definition: c.h:800
#define PG_RE_THROW()
Definition: elog.h:350
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:950
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2031
#define PG_TRY()
Definition: elog.h:309
struct ReorderBufferChange::@98::@101 msg
#define PG_END_TRY()
Definition: elog.h:334

◆ ReorderBufferReturnChange()

void ReorderBufferReturnChange ( ReorderBuffer ,
ReorderBufferChange ,
bool   
)

Definition at line 459 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::msg, pfree(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferFreeSnap(), ReorderBufferReturnRelids(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, ReorderBufferChange::tp, and ReorderBufferChange::truncate.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferProcessTXN(), ReorderBufferQueueChange(), ReorderBufferResetTXN(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferToastReset(), and ReorderBufferTruncateTXN().

461 {
462  /* update memory accounting info */
463  if (upd_mem)
464  ReorderBufferChangeMemoryUpdate(rb, change, false);
465 
466  /* free contained data */
467  switch (change->action)
468  {
473  if (change->data.tp.newtuple)
474  {
475  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
476  change->data.tp.newtuple = NULL;
477  }
478 
479  if (change->data.tp.oldtuple)
480  {
481  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
482  change->data.tp.oldtuple = NULL;
483  }
484  break;
486  if (change->data.msg.prefix != NULL)
487  pfree(change->data.msg.prefix);
488  change->data.msg.prefix = NULL;
489  if (change->data.msg.message != NULL)
490  pfree(change->data.msg.message);
491  change->data.msg.message = NULL;
492  break;
494  if (change->data.inval.invalidations)
495  pfree(change->data.inval.invalidations);
496  change->data.inval.invalidations = NULL;
497  break;
499  if (change->data.snapshot)
500  {
501  ReorderBufferFreeSnap(rb, change->data.snapshot);
502  change->data.snapshot = NULL;
503  }
504  break;
505  /* no data in addition to the struct itself */
507  if (change->data.truncate.relids != NULL)
508  {
509  ReorderBufferReturnRelids(rb, change->data.truncate.relids);
510  change->data.truncate.relids = NULL;
511  }
512  break;
516  break;
517  }
518 
519  pfree(change);
520 }
void pfree(void *pointer)
Definition: mcxt.c:1057
void ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)

◆ ReorderBufferReturnRelids()

void ReorderBufferReturnRelids ( ReorderBuffer ,
Oid relids 
)

Definition at line 579 of file reorderbuffer.c.

References pfree().

Referenced by ReorderBufferReturnChange().

580 {
581  pfree(relids);
582 }
void pfree(void *pointer)
Definition: mcxt.c:1057

◆ ReorderBufferReturnTupleBuf()

void ReorderBufferReturnTupleBuf ( ReorderBuffer ,
ReorderBufferTupleBuf tuple 
)

Definition at line 548 of file reorderbuffer.c.

References pfree().

Referenced by ReorderBufferReturnChange().

549 {
550  pfree(tuple);
551 }
void pfree(void *pointer)
Definition: mcxt.c:1057

◆ ReorderBufferSetBaseSnapshot()

void ReorderBufferSetBaseSnapshot ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
struct SnapshotData snap 
)

◆ ReorderBufferSetRestartPoint()

void ReorderBufferSetRestartPoint ( ReorderBuffer ,
XLogRecPtr  ptr 
)

Definition at line 979 of file reorderbuffer.c.

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

980 {
981  rb->current_restart_decoding_lsn = ptr;
982 }

◆ ReorderBufferXidHasBaseSnapshot()

bool ReorderBufferXidHasBaseSnapshot ( ReorderBuffer ,
TransactionId  xid 
)

Definition at line 2949 of file reorderbuffer.c.

References ReorderBufferTXN::base_snapshot, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), ReorderBufferTXN::toplevel_xid, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildCommitTxn(), SnapBuildDistributeNewCatalogSnapshot(), and SnapBuildProcessChange().

2950 {
2951  ReorderBufferTXN *txn;
2952 
2953  txn = ReorderBufferTXNByXid(rb, xid, false,
2954  NULL, InvalidXLogRecPtr, false);
2955 
2956  /* transaction isn't known yet, ergo no snapshot */
2957  if (txn == NULL)
2958  return false;
2959 
2960  /* a known subtxn? operate on top-level txn instead */
2961  if (rbtxn_is_known_subxact(txn))
2962  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
2963  NULL, InvalidXLogRecPtr, false);
2964 
2965  return txn->base_snapshot != NULL;
2966 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_known_subxact(txn)
TransactionId toplevel_xid

◆ ReorderBufferXidHasCatalogChanges()

bool ReorderBufferXidHasCatalogChanges ( ReorderBuffer ,
TransactionId  xid 
)

Definition at line 2932 of file reorderbuffer.c.

References InvalidXLogRecPtr, rbtxn_has_catalog_changes, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildCommitTxn().

2933 {
2934  ReorderBufferTXN *txn;
2935 
2936  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2937  false);
2938  if (txn == NULL)
2939  return false;
2940 
2941  return rbtxn_has_catalog_changes(txn);
2942 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define rbtxn_has_catalog_changes(txn)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferXidSetCatalogChanges()

void ReorderBufferXidSetCatalogChanges ( ReorderBuffer ,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2908 of file reorderbuffer.c.

References RBTXN_HAS_CATALOG_CHANGES, ReorderBufferTXNByXid(), ReorderBufferTXN::toptxn, ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::txn_flags.

Referenced by DecodeHeapOp(), DecodeXactOp(), and SnapBuildProcessNewCid().

2910 {
2911  ReorderBufferTXN *txn;
2912 
2913  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2914 
2916 
2917  /*
2918  * Mark top-level transaction as having catalog changes too if one of its
2919  * children has so that the ReorderBufferBuildTupleCidHash can
2920  * conveniently check just top-level transaction and decide whether to
2921  * build the hash table or not.
2922  */
2923  if (txn->toptxn != NULL)
2925 }
#define RBTXN_HAS_CATALOG_CHANGES
struct ReorderBufferTXN * toptxn
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ StartupReorderBuffer()

void StartupReorderBuffer ( void  )

Definition at line 4079 of file reorderbuffer.c.

References AllocateDir(), dirent::d_name, DEBUG2, FreeDir(), ReadDir(), ReorderBufferCleanupSerializedTXNs(), and ReplicationSlotValidateName().

Referenced by StartupXLOG().

4080 {
4081  DIR *logical_dir;
4082  struct dirent *logical_de;
4083 
4084  logical_dir = AllocateDir("pg_replslot");
4085  while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
4086  {
4087  if (strcmp(logical_de->d_name, ".") == 0 ||
4088  strcmp(logical_de->d_name, "..") == 0)
4089  continue;
4090 
4091  /* if it cannot be a slot, skip the directory */
4092  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
4093  continue;
4094 
4095  /*
4096  * ok, has to be a surviving logical slot, iterate and delete
4097  * everything starting with xid-*
4098  */
4100  }
4101  FreeDir(logical_dir);
4102 }
Definition: dirent.h:9
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:174
Definition: dirent.c:25
#define DEBUG2
Definition: elog.h:24
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2614
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2680
char d_name[MAX_PATH]
Definition: dirent.h:15
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
int FreeDir(DIR *dir)
Definition: fd.c:2732

Variable Documentation

◆ logical_decoding_work_mem

PGDLLIMPORT int logical_decoding_work_mem

Definition at line 208 of file reorderbuffer.c.

Referenced by ReorderBufferCheckMemoryLimit().