PostgreSQL Source Code  git master
reorderbuffer.h File Reference
#include "access/htup_details.h"
#include "lib/ilist.h"
#include "storage/sinval.h"
#include "utils/hsearch.h"
#include "utils/relcache.h"
#include "utils/snapshot.h"
#include "utils/timestamp.h"
Include dependency graph for reorderbuffer.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTupleBuf
 
struct  ReorderBufferChange
 
struct  ReorderBufferTXN
 
struct  ReorderBuffer
 

Macros

#define ReorderBufferTupleBufData(p)   ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))
 
#define RBTXN_HAS_CATALOG_CHANGES   0x0001
 
#define RBTXN_IS_SUBXACT   0x0002
 
#define RBTXN_IS_SERIALIZED   0x0004
 
#define RBTXN_IS_STREAMED   0x0008
 
#define RBTXN_HAS_TOAST_INSERT   0x0010
 
#define RBTXN_HAS_SPEC_INSERT   0x0020
 
#define rbtxn_has_catalog_changes(txn)
 
#define rbtxn_is_known_subxact(txn)
 
#define rbtxn_is_serialized(txn)
 
#define rbtxn_has_toast_insert(txn)
 
#define rbtxn_has_spec_insert(txn)
 
#define rbtxn_has_incomplete_tuple(txn)
 
#define rbtxn_is_streamed(txn)
 

Typedefs

typedef struct ReorderBufferTupleBuf ReorderBufferTupleBuf
 
typedef struct ReorderBufferChange ReorderBufferChange
 
typedef struct ReorderBufferTXN ReorderBufferTXN
 
typedef struct ReorderBuffer ReorderBuffer
 
typedef void(* ReorderBufferApplyChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
typedef void(* ReorderBufferApplyTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
typedef void(* ReorderBufferBeginCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
typedef void(* ReorderBufferCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
typedef void(* ReorderBufferMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
typedef void(* ReorderBufferStreamStartCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr first_lsn)
 
typedef void(* ReorderBufferStreamStopCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr last_lsn)
 
typedef void(* ReorderBufferStreamAbortCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
 
typedef void(* ReorderBufferStreamCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
typedef void(* ReorderBufferStreamChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
typedef void(* ReorderBufferStreamMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
typedef void(* ReorderBufferStreamTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 

Enumerations

enum  ReorderBufferChangeType {
  REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_MESSAGE,
  REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
  REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_TRUNCATE
}
 

Functions

ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *)
 
ReorderBufferTupleBufReorderBufferGetTupleBuf (ReorderBuffer *, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (ReorderBuffer *, ReorderBufferTupleBuf *tuple)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *)
 
void ReorderBufferReturnChange (ReorderBuffer *, ReorderBufferChange *, bool)
 
OidReorderBufferGetRelids (ReorderBuffer *, int nrelids)
 
void ReorderBufferReturnRelids (ReorderBuffer *, Oid *relids)
 
void ReorderBufferQueueChange (ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *, bool toast_insert)
 
void ReorderBufferQueueMessage (ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
void ReorderBufferCommit (ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferAssignChild (ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
void ReorderBufferAbort (ReorderBuffer *, TransactionId, XLogRecPtr lsn)
 
void ReorderBufferAbortOld (ReorderBuffer *, TransactionId xid)
 
void ReorderBufferForget (ReorderBuffer *, TransactionId, XLogRecPtr lsn)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap)
 
void ReorderBufferAddSnapshot (ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *, TransactionId, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *, TransactionId, XLogRecPtr lsn, RelFileNode node, ItemPointerData pt, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *, TransactionId, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *, TransactionId xid, XLogRecPtr lsn)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *, TransactionId xid)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *)
 
TransactionId ReorderBufferGetOldestXmin (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *, XLogRecPtr ptr)
 
void StartupReorderBuffer (void)
 

Variables

PGDLLIMPORT int logical_decoding_work_mem
 

Macro Definition Documentation

◆ RBTXN_HAS_CATALOG_CHANGES

#define RBTXN_HAS_CATALOG_CHANGES   0x0001

Definition at line 162 of file reorderbuffer.h.

Referenced by ReorderBufferXidSetCatalogChanges().

◆ rbtxn_has_catalog_changes

#define rbtxn_has_catalog_changes (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_HAS_CATALOG_CHANGES) != 0 \
)
#define RBTXN_HAS_CATALOG_CHANGES

Definition at line 170 of file reorderbuffer.h.

Referenced by ReorderBufferBuildTupleCidHash(), and ReorderBufferXidHasCatalogChanges().

◆ rbtxn_has_incomplete_tuple

#define rbtxn_has_incomplete_tuple (   txn)
Value:
( \
rbtxn_has_toast_insert(txn) || rbtxn_has_spec_insert(txn) \
)
#define rbtxn_has_spec_insert(txn)

Definition at line 202 of file reorderbuffer.h.

Referenced by ReorderBufferLargestTopTXN(), and ReorderBufferProcessPartialChange().

◆ RBTXN_HAS_SPEC_INSERT

#define RBTXN_HAS_SPEC_INSERT   0x0020

Definition at line 167 of file reorderbuffer.h.

Referenced by ReorderBufferProcessPartialChange().

◆ rbtxn_has_spec_insert

#define rbtxn_has_spec_insert (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_HAS_SPEC_INSERT) != 0 \
)
#define RBTXN_HAS_SPEC_INSERT

Definition at line 196 of file reorderbuffer.h.

Referenced by ReorderBufferProcessPartialChange().

◆ RBTXN_HAS_TOAST_INSERT

#define RBTXN_HAS_TOAST_INSERT   0x0010

Definition at line 166 of file reorderbuffer.h.

Referenced by ReorderBufferProcessPartialChange().

◆ rbtxn_has_toast_insert

#define rbtxn_has_toast_insert (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_HAS_TOAST_INSERT) != 0 \
)
#define RBTXN_HAS_TOAST_INSERT

Definition at line 188 of file reorderbuffer.h.

Referenced by ReorderBufferProcessPartialChange().

◆ rbtxn_is_known_subxact

#define rbtxn_is_known_subxact (   txn)

◆ RBTXN_IS_SERIALIZED

#define RBTXN_IS_SERIALIZED   0x0004

Definition at line 164 of file reorderbuffer.h.

Referenced by ReorderBufferSerializeTXN(), and ReorderBufferTruncateTXN().

◆ rbtxn_is_serialized

#define rbtxn_is_serialized (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \
)
#define RBTXN_IS_SERIALIZED

Definition at line 182 of file reorderbuffer.h.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferIterTXNInit(), ReorderBufferProcessPartialChange(), and ReorderBufferTruncateTXN().

◆ RBTXN_IS_STREAMED

#define RBTXN_IS_STREAMED   0x0008

Definition at line 165 of file reorderbuffer.h.

Referenced by ReorderBufferTruncateTXN().

◆ rbtxn_is_streamed

◆ RBTXN_IS_SUBXACT

#define RBTXN_IS_SUBXACT   0x0002

Definition at line 163 of file reorderbuffer.h.

Referenced by ReorderBufferAssignChild().

◆ ReorderBufferTupleBufData

#define ReorderBufferTupleBufData (   p)    ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))

Typedef Documentation

◆ ReorderBuffer

typedef struct ReorderBuffer ReorderBuffer

Definition at line 369 of file reorderbuffer.h.

◆ ReorderBufferApplyChangeCB

typedef void(* ReorderBufferApplyChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)

Definition at line 372 of file reorderbuffer.h.

◆ ReorderBufferApplyTruncateCB

typedef void(* ReorderBufferApplyTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)

Definition at line 378 of file reorderbuffer.h.

◆ ReorderBufferBeginCB

typedef void(* ReorderBufferBeginCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)

Definition at line 385 of file reorderbuffer.h.

◆ ReorderBufferChange

◆ ReorderBufferCommitCB

typedef void(* ReorderBufferCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

Definition at line 389 of file reorderbuffer.h.

◆ ReorderBufferMessageCB

typedef void(* ReorderBufferMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)

Definition at line 394 of file reorderbuffer.h.

◆ ReorderBufferStreamAbortCB

typedef void(* ReorderBufferStreamAbortCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)

Definition at line 414 of file reorderbuffer.h.

◆ ReorderBufferStreamChangeCB

typedef void(* ReorderBufferStreamChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)

Definition at line 426 of file reorderbuffer.h.

◆ ReorderBufferStreamCommitCB

typedef void(* ReorderBufferStreamCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

Definition at line 420 of file reorderbuffer.h.

◆ ReorderBufferStreamMessageCB

typedef void(* ReorderBufferStreamMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)

Definition at line 433 of file reorderbuffer.h.

◆ ReorderBufferStreamStartCB

typedef void(* ReorderBufferStreamStartCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr first_lsn)

Definition at line 402 of file reorderbuffer.h.

◆ ReorderBufferStreamStopCB

typedef void(* ReorderBufferStreamStopCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr last_lsn)

Definition at line 408 of file reorderbuffer.h.

◆ ReorderBufferStreamTruncateCB

typedef void(* ReorderBufferStreamTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)

Definition at line 442 of file reorderbuffer.h.

◆ ReorderBufferTupleBuf

◆ ReorderBufferTXN

Enumeration Type Documentation

◆ ReorderBufferChangeType

Enumerator
REORDER_BUFFER_CHANGE_INSERT 
REORDER_BUFFER_CHANGE_UPDATE 
REORDER_BUFFER_CHANGE_DELETE 
REORDER_BUFFER_CHANGE_MESSAGE 
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT 
REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID 
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM 
REORDER_BUFFER_CHANGE_TRUNCATE 

Definition at line 54 of file reorderbuffer.h.

Function Documentation

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn 
)

Definition at line 2468 of file reorderbuffer.c.

References ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), ReorderBuffer::stream_abort, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeAbort().

2469 {
2470  ReorderBufferTXN *txn;
2471 
2472  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2473  false);
2474 
2475  /* unknown, nothing to remove */
2476  if (txn == NULL)
2477  return;
2478 
2479  /* For streamed transactions notify the remote node about the abort. */
2480  if (rbtxn_is_streamed(txn))
2481  {
2482  rb->stream_abort(rb, txn, lsn);
2483 
2484  /*
2485  * We might have decoded changes for this transaction that could load
2486  * the cache as per the current transaction's view (consider DDL's
2487  * happened in this transaction). We don't want the decoding of future
2488  * transactions to use those cache entries so execute invalidations.
2489  */
2490  if (txn->ninvalidations > 0)
2492  txn->invalidations);
2493  }
2494 
2495  /* cosmetic... */
2496  txn->final_lsn = lsn;
2497 
2498  /* remove potential on-disk data, and deallocate */
2499  ReorderBufferCleanupTXN(rb, txn);
2500 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define rbtxn_is_streamed(txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer ,
TransactionId  xid 
)

Definition at line 2510 of file reorderbuffer.c.

References dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, elog, ReorderBufferCleanupTXN(), ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::xid.

Referenced by DecodeStandbyOp().

2511 {
2512  dlist_mutable_iter it;
2513 
2514  /*
2515  * Iterate through all (potential) toplevel TXNs and abort all that are
2516  * older than what possibly can be running. Once we've found the first
2517  * that is alive we stop, there might be some that acquired an xid earlier
2518  * but started writing later, but it's unlikely and they will be cleaned
2519  * up in a later call to this function.
2520  */
2521  dlist_foreach_modify(it, &rb->toplevel_by_lsn)
2522  {
2523  ReorderBufferTXN *txn;
2524 
2525  txn = dlist_container(ReorderBufferTXN, node, it.cur);
2526 
2527  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2528  {
2529  elog(DEBUG2, "aborting old transaction %u", txn->xid);
2530 
2531  /* remove potential on-disk data, and deallocate this tx */
2532  ReorderBufferCleanupTXN(rb, txn);
2533  }
2534  else
2535  return;
2536  }
2537 }
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define DEBUG2
Definition: elog.h:24
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
#define elog(elevel,...)
Definition: elog.h:214

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 2811 of file reorderbuffer.c.

References Assert, ReorderBuffer::context, ReorderBufferTXN::invalidations, MemoryContextAlloc(), ReorderBufferTXN::ninvalidations, ReorderBufferTXNByXid(), repalloc(), ReorderBufferTXN::toptxn, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeXactOp().

2814 {
2815  ReorderBufferTXN *txn;
2816 
2817  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2818 
2819  /*
2820  * We collect all the invalidations under the top transaction so that we
2821  * can execute them all together.
2822  */
2823  if (txn->toptxn)
2824  txn = txn->toptxn;
2825 
2826  Assert(nmsgs > 0);
2827 
2828  /* Accumulate invalidations. */
2829  if (txn->ninvalidations == 0)
2830  {
2831  txn->ninvalidations = nmsgs;
2833  MemoryContextAlloc(rb->context,
2834  sizeof(SharedInvalidationMessage) * nmsgs);
2835  memcpy(txn->invalidations, msgs,
2836  sizeof(SharedInvalidationMessage) * nmsgs);
2837  }
2838  else
2839  {
2842  (txn->ninvalidations + nmsgs));
2843 
2844  memcpy(txn->invalidations + txn->ninvalidations, msgs,
2845  nmsgs * sizeof(SharedInvalidationMessage));
2846  txn->ninvalidations += nmsgs;
2847  }
2848 }
struct ReorderBufferTXN * toptxn
#define Assert(condition)
Definition: c.h:745
SharedInvalidationMessage * invalidations
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1070
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
CommandId  cid 
)

Definition at line 2691 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::command_id, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, ReorderBufferGetChange(), and ReorderBufferQueueChange().

Referenced by SnapBuildProcessNewCid().

2693 {
2695 
2696  change->data.command_id = cid;
2698 
2699  ReorderBufferQueueChange(rb, xid, lsn, change, false);
2700 }
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
union ReorderBufferChange::@99 data

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
RelFileNode  node,
ItemPointerData  pt,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 2778 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessNewCid().

2782 {
2784  ReorderBufferTXN *txn;
2785 
2786  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2787 
2788  change->data.tuplecid.node = node;
2789  change->data.tuplecid.tid = tid;
2790  change->data.tuplecid.cmin = cmin;
2791  change->data.tuplecid.cmax = cmax;
2792  change->data.tuplecid.combocid = combocid;
2793  change->lsn = lsn;
2794  change->txn = txn;
2796 
2797  dlist_push_tail(&txn->tuplecids, &change->node);
2798  txn->ntuplecids++;
2799 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:86
struct ReorderBufferChange::@99::@103 tuplecid
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
union ReorderBufferChange::@99 data
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
dlist_head tuplecids

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
struct SnapshotData snap 
)

◆ ReorderBufferAllocate()

ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 298 of file reorderbuffer.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, ReorderBufferCleanupSerializedTXNs(), ReorderBuffer::size, SLAB_DEFAULT_BLOCK_SIZE, SLAB_LARGE_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::toplevel_by_lsn, ReorderBuffer::tup_context, ReorderBuffer::txn_context, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

299 {
300  ReorderBuffer *buffer;
301  HASHCTL hash_ctl;
302  MemoryContext new_ctx;
303 
304  Assert(MyReplicationSlot != NULL);
305 
306  /* allocate memory in own context, to have better accountability */
308  "ReorderBuffer",
310 
311  buffer =
312  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
313 
314  memset(&hash_ctl, 0, sizeof(hash_ctl));
315 
316  buffer->context = new_ctx;
317 
318  buffer->change_context = SlabContextCreate(new_ctx,
319  "Change",
321  sizeof(ReorderBufferChange));
322 
323  buffer->txn_context = SlabContextCreate(new_ctx,
324  "TXN",
326  sizeof(ReorderBufferTXN));
327 
328  buffer->tup_context = GenerationContextCreate(new_ctx,
329  "Tuples",
331 
332  hash_ctl.keysize = sizeof(TransactionId);
333  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
334  hash_ctl.hcxt = buffer->context;
335 
336  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
338 
340  buffer->by_txn_last_txn = NULL;
341 
342  buffer->outbuf = NULL;
343  buffer->outbufsize = 0;
344  buffer->size = 0;
345 
347 
348  dlist_init(&buffer->toplevel_by_lsn);
350 
351  /*
352  * Ensure there's no stale data from prior uses of this slot, in case some
353  * prior exit avoided calling ReorderBufferFree. Failure to do this can
354  * produce duplicated txns, and it's very cheap if there's nothing there.
355  */
357 
358  return buffer;
359 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define AllocSetContextCreate
Definition: memutils.h:170
#define HASH_CONTEXT
Definition: hsearch.h:91
#define HASH_ELEM
Definition: hsearch.h:85
uint32 TransactionId
Definition: c.h:520
MemoryContext hcxt
Definition: hsearch.h:77
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
Size entrysize
Definition: hsearch.h:72
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:174
ReplicationSlotPersistentData data
Definition: slot.h:143
MemoryContext change_context
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:222
dlist_head txns_by_base_snapshot_lsn
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
#define InvalidTransactionId
Definition: transam.h:31
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define HASH_BLOBS
Definition: hsearch.h:86
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size blockSize)
Definition: generation.c:196
MemoryContext context
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:326
ReorderBufferTXN * by_txn_last_txn
dlist_head toplevel_by_lsn
Size keysize
Definition: hsearch.h:71
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:745
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:221
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797
#define NameStr(name)
Definition: c.h:622
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
MemoryContext tup_context
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext txn_context

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer ,
TransactionId  ,
TransactionId  ,
XLogRecPtr  commit_lsn 
)

Definition at line 978 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SUBXACT, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, ReorderBufferTXN::toplevel_xid, ReorderBufferTXN::toptxn, ReorderBufferTXNByIdEnt::txn, ReorderBufferTXN::txn_flags, and ReorderBufferTXNByIdEnt::xid.

Referenced by LogicalDecodingProcessRecord(), and ReorderBufferCommitChild().

980 {
981  ReorderBufferTXN *txn;
982  ReorderBufferTXN *subtxn;
983  bool new_top;
984  bool new_sub;
985 
986  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
987  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
988 
989  if (!new_sub)
990  {
991  if (rbtxn_is_known_subxact(subtxn))
992  {
993  /* already associated, nothing to do */
994  return;
995  }
996  else
997  {
998  /*
999  * We already saw this transaction, but initially added it to the
1000  * list of top-level txns. Now that we know it's not top-level,
1001  * remove it from there.
1002  */
1003  dlist_delete(&subtxn->node);
1004  }
1005  }
1006 
1007  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1008  subtxn->toplevel_xid = xid;
1009  Assert(subtxn->nsubtxns == 0);
1010 
1011  /* set the reference to top-level transaction */
1012  subtxn->toptxn = txn;
1013 
1014  /* add to subtransaction list */
1015  dlist_push_tail(&txn->subtxns, &subtxn->node);
1016  txn->nsubtxns++;
1017 
1018  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1019  ReorderBufferTransferSnapToParent(txn, subtxn);
1020 
1021  /* Verify LSN-ordering invariant */
1022  AssertTXNLsnOrder(rb);
1023 }
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
#define RBTXN_IS_SUBXACT
struct ReorderBufferTXN * toptxn
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:745
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_known_subxact(txn)
TransactionId toplevel_xid

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2399 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, FirstCommandId, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), ReorderBufferStreamCommit(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

2403 {
2404  ReorderBufferTXN *txn;
2405  Snapshot snapshot_now;
2406  CommandId command_id = FirstCommandId;
2407 
2408  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2409  false);
2410 
2411  /* unknown transaction, nothing to replay */
2412  if (txn == NULL)
2413  return;
2414 
2415  txn->final_lsn = commit_lsn;
2416  txn->end_lsn = end_lsn;
2417  txn->commit_time = commit_time;
2418  txn->origin_id = origin_id;
2419  txn->origin_lsn = origin_lsn;
2420 
2421  /*
2422  * If the transaction was (partially) streamed, we need to commit it in a
2423  * 'streamed' way. That is, we first stream the remaining part of the
2424  * transaction, and then invoke stream_commit message.
2425  *
2426  * Called after everything (origin ID, LSN, ...) is stored in the
2427  * transaction to avoid passing that information directly.
2428  */
2429  if (rbtxn_is_streamed(txn))
2430  {
2431  ReorderBufferStreamCommit(rb, txn);
2432  return;
2433  }
2434 
2435  /*
2436  * If this transaction has no snapshot, it didn't make any changes to the
2437  * database, so there's nothing to decode. Note that
2438  * ReorderBufferCommitChild will have transferred any snapshots from
2439  * subtransactions if there were any.
2440  */
2441  if (txn->base_snapshot == NULL)
2442  {
2443  Assert(txn->ninvalidations == 0);
2444  ReorderBufferCleanupTXN(rb, txn);
2445  return;
2446  }
2447 
2448  snapshot_now = txn->base_snapshot;
2449 
2450  /* Process and send the changes to output plugin. */
2451  ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2452  command_id, false);
2453 }
uint32 CommandId
Definition: c.h:534
TimestampTz commit_time
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
RepOriginId origin_id
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
XLogRecPtr origin_lsn
#define FirstCommandId
Definition: c.h:536
#define rbtxn_is_streamed(txn)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:745
XLogRecPtr end_lsn
static void ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer ,
TransactionId  ,
TransactionId  ,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 1098 of file reorderbuffer.c.

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

1101 {
1102  ReorderBufferTXN *subtxn;
1103 
1104  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1105  InvalidXLogRecPtr, false);
1106 
1107  /*
1108  * No need to do anything if that subtxn didn't contain any changes
1109  */
1110  if (!subtxn)
1111  return;
1112 
1113  subtxn->final_lsn = commit_lsn;
1114  subtxn->end_lsn = end_lsn;
1115 
1116  /*
1117  * Assign this subxact as a child of the toplevel xact (no-op if already
1118  * done.)
1119  */
1120  ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1121 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr final_lsn
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
XLogRecPtr end_lsn
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn 
)

Definition at line 2553 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), ReorderBuffer::stream_abort, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

2554 {
2555  ReorderBufferTXN *txn;
2556 
2557  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2558  false);
2559 
2560  /* unknown, nothing to forget */
2561  if (txn == NULL)
2562  return;
2563 
2564  /* For streamed transactions notify the remote node about the abort. */
2565  if (rbtxn_is_streamed(txn))
2566  rb->stream_abort(rb, txn, lsn);
2567 
2568  /* cosmetic... */
2569  txn->final_lsn = lsn;
2570 
2571  /*
2572  * Process cache invalidation messages if there are any. Even if we're not
2573  * interested in the transaction's contents, it could have manipulated the
2574  * catalog and we need to update the caches according to that.
2575  */
2576  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2578  txn->invalidations);
2579  else
2580  Assert(txn->ninvalidations == 0);
2581 
2582  /* remove potential on-disk data, and deallocate */
2583  ReorderBufferCleanupTXN(rb, txn);
2584 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
#define rbtxn_is_streamed(txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:745
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer )

Definition at line 365 of file reorderbuffer.c.

References ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

366 {
367  MemoryContext context = rb->context;
368 
369  /*
370  * We free separately allocated data by entirely scrapping reorderbuffer's
371  * memory context.
372  */
373  MemoryContextDelete(context);
374 
375  /* Free disk space used by unconsumed reorder buffers */
377 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:212
ReplicationSlotPersistentData data
Definition: slot.h:143
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NameStr(name)
Definition: c.h:622
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)

◆ ReorderBufferGetChange()

ReorderBufferChange* ReorderBufferGetChange ( ReorderBuffer )

Definition at line 436 of file reorderbuffer.c.

References ReorderBuffer::change_context, and MemoryContextAlloc().

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddNewTupleCids(), ReorderBufferAddSnapshot(), ReorderBufferQueueMessage(), and ReorderBufferRestoreChange().

437 {
438  ReorderBufferChange *change;
439 
440  change = (ReorderBufferChange *)
441  MemoryContextAlloc(rb->change_context, sizeof(ReorderBufferChange));
442 
443  memset(change, 0, sizeof(ReorderBufferChange));
444  return change;
445 }
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer )

Definition at line 923 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessRunningXacts().

924 {
925  ReorderBufferTXN *txn;
926 
927  AssertTXNLsnOrder(rb);
928 
929  if (dlist_is_empty(&rb->toplevel_by_lsn))
930  return NULL;
931 
932  txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
933 
936  return txn;
937 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:745
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define rbtxn_is_known_subxact(txn)

◆ ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBuffer rb)

Definition at line 951 of file reorderbuffer.c.

References AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, dlist_head_element, dlist_is_empty(), InvalidTransactionId, ReorderBufferTXNByIdEnt::txn, ReorderBuffer::txns_by_base_snapshot_lsn, and SnapshotData::xmin.

Referenced by SnapBuildProcessRunningXacts().

952 {
953  ReorderBufferTXN *txn;
954 
955  AssertTXNLsnOrder(rb);
956 
958  return InvalidTransactionId;
959 
960  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
962  return txn->base_snapshot->xmin;
963 }
Snapshot base_snapshot
dlist_head txns_by_base_snapshot_lsn
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: snapshot.h:157
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289

◆ ReorderBufferGetRelids()

Oid* ReorderBufferGetRelids ( ReorderBuffer ,
int  nrelids 
)

Definition at line 550 of file reorderbuffer.c.

References ReorderBuffer::context, and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

551 {
552  Oid *relids;
553  Size alloc_len;
554 
555  alloc_len = sizeof(Oid) * nrelids;
556 
557  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
558 
559  return relids;
560 }
unsigned int Oid
Definition: postgres_ext.h:31
size_t Size
Definition: c.h:473
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797

◆ ReorderBufferGetTupleBuf()

ReorderBufferTupleBuf* ReorderBufferGetTupleBuf ( ReorderBuffer ,
Size  tuple_len 
)

Definition at line 514 of file reorderbuffer.c.

References ReorderBufferTupleBuf::alloc_tuple_size, MemoryContextAlloc(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, HeapTupleData::t_data, ReorderBuffer::tup_context, and ReorderBufferTupleBuf::tuple.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

515 {
516  ReorderBufferTupleBuf *tuple;
517  Size alloc_len;
518 
519  alloc_len = tuple_len + SizeofHeapTupleHeader;
520 
521  tuple = (ReorderBufferTupleBuf *)
522  MemoryContextAlloc(rb->tup_context,
523  sizeof(ReorderBufferTupleBuf) +
524  MAXIMUM_ALIGNOF + alloc_len);
525  tuple->alloc_tuple_size = alloc_len;
526  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
527 
528  return tuple;
529 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
HeapTupleHeader t_data
Definition: htup.h:68
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:38
HeapTupleData tuple
Definition: reorderbuffer.h:29
size_t Size
Definition: c.h:473
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer ,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 2593 of file reorderbuffer.c.

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by DecodeXactOp(), ReorderBufferAbort(), and ReorderBufferForget().

2595 {
2596  bool use_subtxn = IsTransactionOrTransactionBlock();
2597  int i;
2598 
2599  if (use_subtxn)
2600  BeginInternalSubTransaction("replay");
2601 
2602  /*
2603  * Force invalidations to happen outside of a valid transaction - that way
2604  * entries will just be marked as invalid without accessing the catalog.
2605  * That's advantageous because we don't need to setup the full state
2606  * necessary for catalog access.
2607  */
2608  if (use_subtxn)
2610 
2611  for (i = 0; i < ninvalidations; i++)
2612  LocalExecuteInvalidationMessage(&invalidations[i]);
2613 
2614  if (use_subtxn)
2616 }
void AbortCurrentTransaction(void)
Definition: xact.c:3211
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4702
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4513
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4408
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:559
int i

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer ,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2629 of file reorderbuffer.c.

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by DecodeHeap2Op(), DecodeHeapOp(), DecodeLogicalMsgOp(), DecodeStandbyOp(), DecodeXactOp(), DecodeXLogOp(), and LogicalDecodingProcessRecord().

2630 {
2631  /* many records won't have an xid assigned, centralize check here */
2632  if (xid != InvalidTransactionId)
2633  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2634 }
#define InvalidTransactionId
Definition: transam.h:31
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
ReorderBufferChange ,
bool  toast_insert 
)

Definition at line 732 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, ReorderBufferTXN::concurrent_abort, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBufferChangeMemoryUpdate(), ReorderBufferCheckMemoryLimit(), ReorderBufferProcessPartialChange(), ReorderBufferReturnChange(), ReorderBufferTXNByXid(), ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

734 {
735  ReorderBufferTXN *txn;
736 
737  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
738 
739  /*
740  * While streaming the previous changes we have detected that the
741  * transaction is aborted. So there is no point in collecting further
742  * changes for it.
743  */
744  if (txn->concurrent_abort)
745  {
746  /*
747  * We don't need to update memory accounting for this change as we
748  * have not added it to the queue yet.
749  */
750  ReorderBufferReturnChange(rb, change, false);
751  return;
752  }
753 
754  change->lsn = lsn;
755  change->txn = txn;
756 
757  Assert(InvalidXLogRecPtr != lsn);
758  dlist_push_tail(&txn->changes, &change->node);
759  txn->nentries++;
760  txn->nentries_mem++;
761 
762  /* update memory accounting information */
763  ReorderBufferChangeMemoryUpdate(rb, change, true);
764 
765  /* process partial change */
766  ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
767 
768  /* check the memory limits and evict something if needed */
770 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
dlist_head changes
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
#define Assert(condition)
Definition: c.h:745
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer ,
TransactionId  ,
Snapshot  snapshot,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 776 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), TeardownHistoricSnapshot(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeLogicalMsgOp().

780 {
781  if (transactional)
782  {
783  MemoryContext oldcontext;
784  ReorderBufferChange *change;
785 
787 
788  oldcontext = MemoryContextSwitchTo(rb->context);
789 
790  change = ReorderBufferGetChange(rb);
792  change->data.msg.prefix = pstrdup(prefix);
793  change->data.msg.message_size = message_size;
794  change->data.msg.message = palloc(message_size);
795  memcpy(change->data.msg.message, message, message_size);
796 
797  ReorderBufferQueueChange(rb, xid, lsn, change, false);
798 
799  MemoryContextSwitchTo(oldcontext);
800  }
801  else
802  {
803  ReorderBufferTXN *txn = NULL;
804  volatile Snapshot snapshot_now = snapshot;
805 
806  if (xid != InvalidTransactionId)
807  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
808 
809  /* setup snapshot to allow catalog access */
810  SetupHistoricSnapshot(snapshot_now, NULL);
811  PG_TRY();
812  {
813  rb->message(rb, txn, lsn, false, prefix, message_size, message);
814 
816  }
817  PG_CATCH();
818  {
820  PG_RE_THROW();
821  }
822  PG_END_TRY();
823  }
824 }
struct ReorderBufferChange::@99::@102 msg
char * pstrdup(const char *in)
Definition: mcxt.c:1187
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2047
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
#define InvalidTransactionId
Definition: transam.h:31
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
union ReorderBufferChange::@99 data
#define PG_CATCH()
Definition: elog.h:305
#define Assert(condition)
Definition: c.h:745
#define PG_RE_THROW()
Definition: elog.h:336
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:950
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2031
#define PG_TRY()
Definition: elog.h:295
#define PG_END_TRY()
Definition: elog.h:320

◆ ReorderBufferReturnChange()

void ReorderBufferReturnChange ( ReorderBuffer ,
ReorderBufferChange ,
bool   
)

Definition at line 451 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::msg, pfree(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferFreeSnap(), ReorderBufferReturnRelids(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, ReorderBufferChange::tp, and ReorderBufferChange::truncate.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferProcessTXN(), ReorderBufferQueueChange(), ReorderBufferResetTXN(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferToastReset(), and ReorderBufferTruncateTXN().

453 {
454  /* update memory accounting info */
455  if (upd_mem)
456  ReorderBufferChangeMemoryUpdate(rb, change, false);
457 
458  /* free contained data */
459  switch (change->action)
460  {
465  if (change->data.tp.newtuple)
466  {
467  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
468  change->data.tp.newtuple = NULL;
469  }
470 
471  if (change->data.tp.oldtuple)
472  {
473  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
474  change->data.tp.oldtuple = NULL;
475  }
476  break;
478  if (change->data.msg.prefix != NULL)
479  pfree(change->data.msg.prefix);
480  change->data.msg.prefix = NULL;
481  if (change->data.msg.message != NULL)
482  pfree(change->data.msg.message);
483  change->data.msg.message = NULL;
484  break;
486  if (change->data.snapshot)
487  {
488  ReorderBufferFreeSnap(rb, change->data.snapshot);
489  change->data.snapshot = NULL;
490  }
491  break;
492  /* no data in addition to the struct itself */
494  if (change->data.truncate.relids != NULL)
495  {
496  ReorderBufferReturnRelids(rb, change->data.truncate.relids);
497  change->data.truncate.relids = NULL;
498  }
499  break;
503  break;
504  }
505 
506  pfree(change);
507 }
void pfree(void *pointer)
Definition: mcxt.c:1057
void ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)

◆ ReorderBufferReturnRelids()

void ReorderBufferReturnRelids ( ReorderBuffer ,
Oid relids 
)

Definition at line 566 of file reorderbuffer.c.

References pfree().

Referenced by ReorderBufferReturnChange().

567 {
568  pfree(relids);
569 }
void pfree(void *pointer)
Definition: mcxt.c:1057

◆ ReorderBufferReturnTupleBuf()

void ReorderBufferReturnTupleBuf ( ReorderBuffer ,
ReorderBufferTupleBuf tuple 
)

Definition at line 535 of file reorderbuffer.c.

References pfree().

Referenced by ReorderBufferReturnChange().

536 {
537  pfree(tuple);
538 }
void pfree(void *pointer)
Definition: mcxt.c:1057

◆ ReorderBufferSetBaseSnapshot()

void ReorderBufferSetBaseSnapshot ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
struct SnapshotData snap 
)

◆ ReorderBufferSetRestartPoint()

void ReorderBufferSetRestartPoint ( ReorderBuffer ,
XLogRecPtr  ptr 
)

Definition at line 966 of file reorderbuffer.c.

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

967 {
968  rb->current_restart_decoding_lsn = ptr;
969 }

◆ ReorderBufferXidHasBaseSnapshot()

bool ReorderBufferXidHasBaseSnapshot ( ReorderBuffer ,
TransactionId  xid 
)

Definition at line 2908 of file reorderbuffer.c.

References ReorderBufferTXN::base_snapshot, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), ReorderBufferTXN::toplevel_xid, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildCommitTxn(), SnapBuildDistributeNewCatalogSnapshot(), and SnapBuildProcessChange().

2909 {
2910  ReorderBufferTXN *txn;
2911 
2912  txn = ReorderBufferTXNByXid(rb, xid, false,
2913  NULL, InvalidXLogRecPtr, false);
2914 
2915  /* transaction isn't known yet, ergo no snapshot */
2916  if (txn == NULL)
2917  return false;
2918 
2919  /* a known subtxn? operate on top-level txn instead */
2920  if (rbtxn_is_known_subxact(txn))
2921  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
2922  NULL, InvalidXLogRecPtr, false);
2923 
2924  return txn->base_snapshot != NULL;
2925 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_known_subxact(txn)
TransactionId toplevel_xid

◆ ReorderBufferXidHasCatalogChanges()

bool ReorderBufferXidHasCatalogChanges ( ReorderBuffer ,
TransactionId  xid 
)

Definition at line 2891 of file reorderbuffer.c.

References InvalidXLogRecPtr, rbtxn_has_catalog_changes, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildCommitTxn().

2892 {
2893  ReorderBufferTXN *txn;
2894 
2895  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2896  false);
2897  if (txn == NULL)
2898  return false;
2899 
2900  return rbtxn_has_catalog_changes(txn);
2901 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define rbtxn_has_catalog_changes(txn)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferXidSetCatalogChanges()

void ReorderBufferXidSetCatalogChanges ( ReorderBuffer ,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2867 of file reorderbuffer.c.

References RBTXN_HAS_CATALOG_CHANGES, ReorderBufferTXNByXid(), ReorderBufferTXN::toptxn, ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::txn_flags.

Referenced by DecodeHeapOp(), DecodeXactOp(), and SnapBuildProcessNewCid().

2869 {
2870  ReorderBufferTXN *txn;
2871 
2872  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2873 
2875 
2876  /*
2877  * Mark top-level transaction as having catalog changes too if one of its
2878  * children has so that the ReorderBufferBuildTupleCidHash can
2879  * conveniently check just top-level transaction and decide whether to
2880  * build the hash table or not.
2881  */
2882  if (txn->toptxn != NULL)
2884 }
#define RBTXN_HAS_CATALOG_CHANGES
struct ReorderBufferTXN * toptxn
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ StartupReorderBuffer()

void StartupReorderBuffer ( void  )

Definition at line 3973 of file reorderbuffer.c.

References AllocateDir(), dirent::d_name, DEBUG2, FreeDir(), ReadDir(), ReorderBufferCleanupSerializedTXNs(), and ReplicationSlotValidateName().

Referenced by StartupXLOG().

3974 {
3975  DIR *logical_dir;
3976  struct dirent *logical_de;
3977 
3978  logical_dir = AllocateDir("pg_replslot");
3979  while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
3980  {
3981  if (strcmp(logical_de->d_name, ".") == 0 ||
3982  strcmp(logical_de->d_name, "..") == 0)
3983  continue;
3984 
3985  /* if it cannot be a slot, skip the directory */
3986  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
3987  continue;
3988 
3989  /*
3990  * ok, has to be a surviving logical slot, iterate and delete
3991  * everything starting with xid-*
3992  */
3994  }
3995  FreeDir(logical_dir);
3996 }
Definition: dirent.h:9
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:175
Definition: dirent.c:25
#define DEBUG2
Definition: elog.h:24
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2583
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2649
char d_name[MAX_PATH]
Definition: dirent.h:15
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
int FreeDir(DIR *dir)
Definition: fd.c:2701

Variable Documentation

◆ logical_decoding_work_mem

PGDLLIMPORT int logical_decoding_work_mem

Definition at line 208 of file reorderbuffer.c.

Referenced by ReorderBufferCheckMemoryLimit().