PostgreSQL Source Code  git master
reorderbuffer.h File Reference
#include "access/htup_details.h"
#include "lib/ilist.h"
#include "lib/pairingheap.h"
#include "storage/sinval.h"
#include "utils/hsearch.h"
#include "utils/relcache.h"
#include "utils/snapshot.h"
#include "utils/timestamp.h"
Include dependency graph for reorderbuffer.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ReorderBufferChange
 
struct  ReorderBufferTXN
 
struct  ReorderBuffer
 

Macros

#define PG_LOGICAL_DIR   "pg_logical"
 
#define PG_LOGICAL_MAPPINGS_DIR   PG_LOGICAL_DIR "/mappings"
 
#define PG_LOGICAL_SNAPSHOTS_DIR   PG_LOGICAL_DIR "/snapshots"
 
#define RBTXN_HAS_CATALOG_CHANGES   0x0001
 
#define RBTXN_IS_SUBXACT   0x0002
 
#define RBTXN_IS_SERIALIZED   0x0004
 
#define RBTXN_IS_SERIALIZED_CLEAR   0x0008
 
#define RBTXN_IS_STREAMED   0x0010
 
#define RBTXN_HAS_PARTIAL_CHANGE   0x0020
 
#define RBTXN_PREPARE   0x0040
 
#define RBTXN_SKIPPED_PREPARE   0x0080
 
#define RBTXN_HAS_STREAMABLE_CHANGE   0x0100
 
#define rbtxn_has_catalog_changes(txn)
 
#define rbtxn_is_known_subxact(txn)
 
#define rbtxn_is_serialized(txn)
 
#define rbtxn_is_serialized_clear(txn)
 
#define rbtxn_has_partial_change(txn)
 
#define rbtxn_has_streamable_change(txn)
 
#define rbtxn_is_streamed(txn)
 
#define rbtxn_prepared(txn)
 
#define rbtxn_skip_prepared(txn)
 
#define rbtxn_is_toptxn(txn)
 
#define rbtxn_is_subtxn(txn)
 
#define rbtxn_get_toptxn(txn)
 

Typedefs

typedef enum ReorderBufferChangeType ReorderBufferChangeType
 
typedef struct ReorderBufferChange ReorderBufferChange
 
typedef struct ReorderBufferTXN ReorderBufferTXN
 
typedef struct ReorderBuffer ReorderBuffer
 
typedef void(* ReorderBufferApplyChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
typedef void(* ReorderBufferApplyTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
typedef void(* ReorderBufferBeginCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
typedef void(* ReorderBufferCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
typedef void(* ReorderBufferMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
typedef void(* ReorderBufferBeginPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
typedef void(* ReorderBufferPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
typedef void(* ReorderBufferCommitPreparedCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
typedef void(* ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
 
typedef void(* ReorderBufferStreamStartCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr first_lsn)
 
typedef void(* ReorderBufferStreamStopCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr last_lsn)
 
typedef void(* ReorderBufferStreamAbortCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
 
typedef void(* ReorderBufferStreamPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
typedef void(* ReorderBufferStreamCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
typedef void(* ReorderBufferStreamChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
typedef void(* ReorderBufferStreamMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
typedef void(* ReorderBufferStreamTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
typedef void(* ReorderBufferUpdateProgressTxnCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr lsn)
 

Enumerations

enum  DebugLogicalRepStreamingMode { DEBUG_LOGICAL_REP_STREAMING_BUFFERED , DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE }
 
enum  ReorderBufferChangeType {
  REORDER_BUFFER_CHANGE_INSERT , REORDER_BUFFER_CHANGE_UPDATE , REORDER_BUFFER_CHANGE_DELETE , REORDER_BUFFER_CHANGE_MESSAGE ,
  REORDER_BUFFER_CHANGE_INVALIDATION , REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT , REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID , REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID ,
  REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT , REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM , REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT , REORDER_BUFFER_CHANGE_TRUNCATE
}
 

Functions

ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
HeapTuple ReorderBufferGetTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (HeapTuple tuple)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *rb)
 
void ReorderBufferReturnChange (ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
 
OidReorderBufferGetRelids (ReorderBuffer *rb, int nrelids)
 
void ReorderBufferReturnRelids (ReorderBuffer *rb, Oid *relids)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferFinishPrepared (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, TimestampTz abort_time)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferInvalidate (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileLocator locator, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferRememberPrepareInfo (ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferSkipPrepare (ReorderBuffer *rb, TransactionId xid)
 
void ReorderBufferPrepare (ReorderBuffer *rb, TransactionId xid, char *gid)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
TransactionId ReorderBufferGetOldestXmin (ReorderBuffer *rb)
 
TransactionIdReorderBufferGetCatalogChangesXacts (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
void StartupReorderBuffer (void)
 

Variables

PGDLLIMPORT int logical_decoding_work_mem
 
PGDLLIMPORT int debug_logical_replication_streaming
 

Macro Definition Documentation

◆ PG_LOGICAL_DIR

#define PG_LOGICAL_DIR   "pg_logical"

Definition at line 22 of file reorderbuffer.h.

◆ PG_LOGICAL_MAPPINGS_DIR

#define PG_LOGICAL_MAPPINGS_DIR   PG_LOGICAL_DIR "/mappings"

Definition at line 23 of file reorderbuffer.h.

◆ PG_LOGICAL_SNAPSHOTS_DIR

#define PG_LOGICAL_SNAPSHOTS_DIR   PG_LOGICAL_DIR "/snapshots"

Definition at line 24 of file reorderbuffer.h.

◆ rbtxn_get_toptxn

#define rbtxn_get_toptxn (   txn)
Value:
( \
rbtxn_is_subtxn(txn) ? (txn)->toptxn : (txn) \
)

Definition at line 252 of file reorderbuffer.h.

◆ RBTXN_HAS_CATALOG_CHANGES

#define RBTXN_HAS_CATALOG_CHANGES   0x0001

Definition at line 167 of file reorderbuffer.h.

◆ rbtxn_has_catalog_changes

#define rbtxn_has_catalog_changes (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_HAS_CATALOG_CHANGES) != 0 \
)
#define RBTXN_HAS_CATALOG_CHANGES

Definition at line 178 of file reorderbuffer.h.

◆ RBTXN_HAS_PARTIAL_CHANGE

#define RBTXN_HAS_PARTIAL_CHANGE   0x0020

Definition at line 172 of file reorderbuffer.h.

◆ rbtxn_has_partial_change

#define rbtxn_has_partial_change (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_HAS_PARTIAL_CHANGE) != 0 \
)
#define RBTXN_HAS_PARTIAL_CHANGE

Definition at line 202 of file reorderbuffer.h.

◆ RBTXN_HAS_STREAMABLE_CHANGE

#define RBTXN_HAS_STREAMABLE_CHANGE   0x0100

Definition at line 175 of file reorderbuffer.h.

◆ rbtxn_has_streamable_change

#define rbtxn_has_streamable_change (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_HAS_STREAMABLE_CHANGE) != 0 \
)
#define RBTXN_HAS_STREAMABLE_CHANGE

Definition at line 208 of file reorderbuffer.h.

◆ rbtxn_is_known_subxact

#define rbtxn_is_known_subxact (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_IS_SUBXACT) != 0 \
)
#define RBTXN_IS_SUBXACT

Definition at line 184 of file reorderbuffer.h.

◆ RBTXN_IS_SERIALIZED

#define RBTXN_IS_SERIALIZED   0x0004

Definition at line 169 of file reorderbuffer.h.

◆ rbtxn_is_serialized

#define rbtxn_is_serialized (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \
)
#define RBTXN_IS_SERIALIZED

Definition at line 190 of file reorderbuffer.h.

◆ RBTXN_IS_SERIALIZED_CLEAR

#define RBTXN_IS_SERIALIZED_CLEAR   0x0008

Definition at line 170 of file reorderbuffer.h.

◆ rbtxn_is_serialized_clear

#define rbtxn_is_serialized_clear (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_IS_SERIALIZED_CLEAR) != 0 \
)
#define RBTXN_IS_SERIALIZED_CLEAR

Definition at line 196 of file reorderbuffer.h.

◆ RBTXN_IS_STREAMED

#define RBTXN_IS_STREAMED   0x0010

Definition at line 171 of file reorderbuffer.h.

◆ rbtxn_is_streamed

#define rbtxn_is_streamed (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \
)
#define RBTXN_IS_STREAMED

Definition at line 222 of file reorderbuffer.h.

◆ rbtxn_is_subtxn

#define rbtxn_is_subtxn (   txn)
Value:
( \
(txn)->toptxn != NULL \
)

Definition at line 246 of file reorderbuffer.h.

◆ RBTXN_IS_SUBXACT

#define RBTXN_IS_SUBXACT   0x0002

Definition at line 168 of file reorderbuffer.h.

◆ rbtxn_is_toptxn

#define rbtxn_is_toptxn (   txn)
Value:
( \
(txn)->toptxn == NULL \
)

Definition at line 240 of file reorderbuffer.h.

◆ RBTXN_PREPARE

#define RBTXN_PREPARE   0x0040

Definition at line 173 of file reorderbuffer.h.

◆ rbtxn_prepared

#define rbtxn_prepared (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_PREPARE) != 0 \
)
#define RBTXN_PREPARE

Definition at line 228 of file reorderbuffer.h.

◆ rbtxn_skip_prepared

#define rbtxn_skip_prepared (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_SKIPPED_PREPARE) != 0 \
)
#define RBTXN_SKIPPED_PREPARE

Definition at line 234 of file reorderbuffer.h.

◆ RBTXN_SKIPPED_PREPARE

#define RBTXN_SKIPPED_PREPARE   0x0080

Definition at line 174 of file reorderbuffer.h.

Typedef Documentation

◆ ReorderBuffer

typedef struct ReorderBuffer ReorderBuffer

Definition at line 28 of file reorderbuffer.h.

◆ ReorderBufferApplyChangeCB

typedef void(* ReorderBufferApplyChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)

Definition at line 435 of file reorderbuffer.h.

◆ ReorderBufferApplyTruncateCB

typedef void(* ReorderBufferApplyTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)

Definition at line 441 of file reorderbuffer.h.

◆ ReorderBufferBeginCB

typedef void(* ReorderBufferBeginCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)

Definition at line 448 of file reorderbuffer.h.

◆ ReorderBufferBeginPrepareCB

typedef void(* ReorderBufferBeginPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)

Definition at line 465 of file reorderbuffer.h.

◆ ReorderBufferChange

◆ ReorderBufferChangeType

◆ ReorderBufferCommitCB

typedef void(* ReorderBufferCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

Definition at line 452 of file reorderbuffer.h.

◆ ReorderBufferCommitPreparedCB

typedef void(* ReorderBufferCommitPreparedCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

Definition at line 474 of file reorderbuffer.h.

◆ ReorderBufferMessageCB

typedef void(* ReorderBufferMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)

Definition at line 457 of file reorderbuffer.h.

◆ ReorderBufferPrepareCB

typedef void(* ReorderBufferPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)

Definition at line 469 of file reorderbuffer.h.

◆ ReorderBufferRollbackPreparedCB

typedef void(* ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)

Definition at line 479 of file reorderbuffer.h.

◆ ReorderBufferStreamAbortCB

typedef void(* ReorderBufferStreamAbortCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)

Definition at line 497 of file reorderbuffer.h.

◆ ReorderBufferStreamChangeCB

typedef void(* ReorderBufferStreamChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)

Definition at line 515 of file reorderbuffer.h.

◆ ReorderBufferStreamCommitCB

typedef void(* ReorderBufferStreamCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

Definition at line 509 of file reorderbuffer.h.

◆ ReorderBufferStreamMessageCB

typedef void(* ReorderBufferStreamMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)

Definition at line 522 of file reorderbuffer.h.

◆ ReorderBufferStreamPrepareCB

typedef void(* ReorderBufferStreamPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)

Definition at line 503 of file reorderbuffer.h.

◆ ReorderBufferStreamStartCB

typedef void(* ReorderBufferStreamStartCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr first_lsn)

Definition at line 485 of file reorderbuffer.h.

◆ ReorderBufferStreamStopCB

typedef void(* ReorderBufferStreamStopCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr last_lsn)

Definition at line 491 of file reorderbuffer.h.

◆ ReorderBufferStreamTruncateCB

typedef void(* ReorderBufferStreamTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)

Definition at line 531 of file reorderbuffer.h.

◆ ReorderBufferTXN

◆ ReorderBufferUpdateProgressTxnCB

typedef void(* ReorderBufferUpdateProgressTxnCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr lsn)

Definition at line 539 of file reorderbuffer.h.

Enumeration Type Documentation

◆ DebugLogicalRepStreamingMode

Enumerator
DEBUG_LOGICAL_REP_STREAMING_BUFFERED 
DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE 

Definition at line 31 of file reorderbuffer.h.

32 {
DebugLogicalRepStreamingMode
Definition: reorderbuffer.h:32
@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
Definition: reorderbuffer.h:34
@ DEBUG_LOGICAL_REP_STREAMING_BUFFERED
Definition: reorderbuffer.h:33

◆ ReorderBufferChangeType

Enumerator
REORDER_BUFFER_CHANGE_INSERT 
REORDER_BUFFER_CHANGE_UPDATE 
REORDER_BUFFER_CHANGE_DELETE 
REORDER_BUFFER_CHANGE_MESSAGE 
REORDER_BUFFER_CHANGE_INVALIDATION 
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT 
REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID 
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT 
REORDER_BUFFER_CHANGE_TRUNCATE 

Definition at line 50 of file reorderbuffer.h.

51 {
ReorderBufferChangeType
Definition: reorderbuffer.h:51
@ REORDER_BUFFER_CHANGE_INVALIDATION
Definition: reorderbuffer.h:56
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
Definition: reorderbuffer.h:61
@ REORDER_BUFFER_CHANGE_INSERT
Definition: reorderbuffer.h:52
@ REORDER_BUFFER_CHANGE_MESSAGE
Definition: reorderbuffer.h:55
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT
Definition: reorderbuffer.h:62
@ REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID
Definition: reorderbuffer.h:58
@ REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
Definition: reorderbuffer.h:59
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT
Definition: reorderbuffer.h:60
@ REORDER_BUFFER_CHANGE_TRUNCATE
Definition: reorderbuffer.h:63
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:54
@ REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT
Definition: reorderbuffer.h:57
@ REORDER_BUFFER_CHANGE_UPDATE
Definition: reorderbuffer.h:53

Function Documentation

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
TimestampTz  abort_time 
)

Definition at line 2931 of file reorderbuffer.c.

2933 {
2934  ReorderBufferTXN *txn;
2935 
2936  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2937  false);
2938 
2939  /* unknown, nothing to remove */
2940  if (txn == NULL)
2941  return;
2942 
2943  txn->xact_time.abort_time = abort_time;
2944 
2945  /* For streamed transactions notify the remote node about the abort. */
2946  if (rbtxn_is_streamed(txn))
2947  {
2948  rb->stream_abort(rb, txn, lsn);
2949 
2950  /*
2951  * We might have decoded changes for this transaction that could load
2952  * the cache as per the current transaction's view (consider DDL's
2953  * happened in this transaction). We don't want the decoding of future
2954  * transactions to use those cache entries so execute invalidations.
2955  */
2956  if (txn->ninvalidations > 0)
2958  txn->invalidations);
2959  }
2960 
2961  /* cosmetic... */
2962  txn->final_lsn = lsn;
2963 
2964  /* remove potential on-disk data, and deallocate */
2965  ReorderBufferCleanupTXN(rb, txn);
2966 }
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_streamed(txn)
SharedInvalidationMessage * invalidations
TimestampTz abort_time
XLogRecPtr final_lsn
union ReorderBufferTXN::@114 xact_time
ReorderBufferStreamAbortCB stream_abort
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References ReorderBufferTXN::abort_time, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), ReorderBuffer::stream_abort, and ReorderBufferTXN::xact_time.

Referenced by DecodeAbort().

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)

Definition at line 2976 of file reorderbuffer.c.

2977 {
2978  dlist_mutable_iter it;
2979 
2980  /*
2981  * Iterate through all (potential) toplevel TXNs and abort all that are
2982  * older than what possibly can be running. Once we've found the first
2983  * that is alive we stop, there might be some that acquired an xid earlier
2984  * but started writing later, but it's unlikely and they will be cleaned
2985  * up in a later call to this function.
2986  */
2988  {
2989  ReorderBufferTXN *txn;
2990 
2991  txn = dlist_container(ReorderBufferTXN, node, it.cur);
2992 
2993  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2994  {
2995  elog(DEBUG2, "aborting old transaction %u", txn->xid);
2996 
2997  /* Notify the remote node about the crash/immediate restart. */
2998  if (rbtxn_is_streamed(txn))
2999  rb->stream_abort(rb, txn, InvalidXLogRecPtr);
3000 
3001  /* remove potential on-disk data, and deallocate this tx */
3002  ReorderBufferCleanupTXN(rb, txn);
3003  }
3004  else
3005  return;
3006  }
3007 }
#define DEBUG2
Definition: elog.h:29
#define elog(elevel,...)
Definition: elog.h:225
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:640
#define dlist_container(type, membername, ptr)
Definition: ilist.h:593
TransactionId xid
dlist_head toplevel_by_lsn
dlist_node * cur
Definition: ilist.h:200
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280

References dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, elog, InvalidXLogRecPtr, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBuffer::stream_abort, ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), and ReorderBufferTXN::xid.

Referenced by standby_decode().

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 3329 of file reorderbuffer.c.

3332 {
3333  ReorderBufferTXN *txn;
3334  MemoryContext oldcontext;
3335  ReorderBufferChange *change;
3336 
3337  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3338 
3339  oldcontext = MemoryContextSwitchTo(rb->context);
3340 
3341  /*
3342  * Collect all the invalidations under the top transaction, if available,
3343  * so that we can execute them all together. See comments atop this
3344  * function.
3345  */
3346  txn = rbtxn_get_toptxn(txn);
3347 
3348  Assert(nmsgs > 0);
3349 
3350  /* Accumulate invalidations. */
3351  if (txn->ninvalidations == 0)
3352  {
3353  txn->ninvalidations = nmsgs;
3355  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3356  memcpy(txn->invalidations, msgs,
3357  sizeof(SharedInvalidationMessage) * nmsgs);
3358  }
3359  else
3360  {
3363  (txn->ninvalidations + nmsgs));
3364 
3365  memcpy(txn->invalidations + txn->ninvalidations, msgs,
3366  nmsgs * sizeof(SharedInvalidationMessage));
3367  txn->ninvalidations += nmsgs;
3368  }
3369 
3370  change = ReorderBufferGetChange(rb);
3372  change->data.inval.ninvalidations = nmsgs;
3373  change->data.inval.invalidations = (SharedInvalidationMessage *)
3374  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3375  memcpy(change->data.inval.invalidations, msgs,
3376  sizeof(SharedInvalidationMessage) * nmsgs);
3377 
3378  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3379 
3380  MemoryContextSwitchTo(oldcontext);
3381 }
#define Assert(condition)
Definition: c.h:861
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1541
void * palloc(Size size)
Definition: mcxt.c:1317
MemoryContextSwitchTo(old_ctx)
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
#define rbtxn_get_toptxn(txn)
ReorderBufferChangeType action
Definition: reorderbuffer.h:81
union ReorderBufferChange::@108 data
struct ReorderBufferChange::@108::@113 inval
MemoryContext context

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferTXN::invalidations, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, palloc(), rbtxn_get_toptxn, REORDER_BUFFER_CHANGE_INVALIDATION, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), and repalloc().

Referenced by xact_decode().

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileLocator  locator,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 3292 of file reorderbuffer.c.

3296 {
3298  ReorderBufferTXN *txn;
3299 
3300  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3301 
3302  change->data.tuplecid.locator = locator;
3303  change->data.tuplecid.tid = tid;
3304  change->data.tuplecid.cmin = cmin;
3305  change->data.tuplecid.cmax = cmax;
3306  change->data.tuplecid.combocid = combocid;
3307  change->lsn = lsn;
3308  change->txn = txn;
3310 
3311  dlist_push_tail(&txn->tuplecids, &change->node);
3312  txn->ntuplecids++;
3313 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:364
struct ReorderBufferChange::@108::@112 tuplecid
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:84
dlist_head tuplecids

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, and ReorderBufferChange::txn.

Referenced by SnapBuildProcessNewCid().

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

◆ ReorderBufferAllocate()

ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 309 of file reorderbuffer.c.

310 {
311  ReorderBuffer *buffer;
312  HASHCTL hash_ctl;
313  MemoryContext new_ctx;
314 
315  Assert(MyReplicationSlot != NULL);
316 
317  /* allocate memory in own context, to have better accountability */
319  "ReorderBuffer",
321 
322  buffer =
323  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
324 
325  memset(&hash_ctl, 0, sizeof(hash_ctl));
326 
327  buffer->context = new_ctx;
328 
329  buffer->change_context = SlabContextCreate(new_ctx,
330  "Change",
332  sizeof(ReorderBufferChange));
333 
334  buffer->txn_context = SlabContextCreate(new_ctx,
335  "TXN",
337  sizeof(ReorderBufferTXN));
338 
339  /*
340  * To minimize memory fragmentation caused by long-running transactions
341  * with changes spanning multiple memory blocks, we use a single
342  * fixed-size memory block for decoded tuple storage. The performance
343  * testing showed that the default memory block size maintains logical
344  * decoding performance without causing fragmentation due to concurrent
345  * transactions. One might think that we can use the max size as
346  * SLAB_LARGE_BLOCK_SIZE but the test also showed it doesn't help resolve
347  * the memory fragmentation.
348  */
349  buffer->tup_context = GenerationContextCreate(new_ctx,
350  "Tuples",
354 
355  hash_ctl.keysize = sizeof(TransactionId);
356  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
357  hash_ctl.hcxt = buffer->context;
358 
359  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
361 
363  buffer->by_txn_last_txn = NULL;
364 
365  buffer->outbuf = NULL;
366  buffer->outbufsize = 0;
367  buffer->size = 0;
368 
369  /* txn_heap is ordered by transaction size */
371 
372  buffer->spillTxns = 0;
373  buffer->spillCount = 0;
374  buffer->spillBytes = 0;
375  buffer->streamTxns = 0;
376  buffer->streamCount = 0;
377  buffer->streamBytes = 0;
378  buffer->totalTxns = 0;
379  buffer->totalBytes = 0;
380 
382 
383  dlist_init(&buffer->toplevel_by_lsn);
385  dclist_init(&buffer->catchange_txns);
386 
387  /*
388  * Ensure there's no stale data from prior uses of this slot, in case some
389  * prior exit avoided calling ReorderBufferFree. Failure to do this can
390  * produce duplicated txns, and it's very cheap if there's nothing there.
391  */
393 
394  return buffer;
395 }
#define NameStr(name)
Definition: c.h:749
uint32 TransactionId
Definition: c.h:655
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: generation.c:160
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
static void dclist_init(dclist_head *head)
Definition: ilist.h:671
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1181
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:189
pairingheap * pairingheap_allocate(pairingheap_comparator compare, void *arg)
Definition: pairingheap.c:42
static int ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg)
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:322
ReplicationSlot * MyReplicationSlot
Definition: slot.c:138
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
MemoryContext hcxt
Definition: hsearch.h:86
dlist_head txns_by_base_snapshot_lsn
dclist_head catchange_txns
MemoryContext change_context
ReorderBufferTXN * by_txn_last_txn
TransactionId by_txn_last_xid
MemoryContext tup_context
pairingheap * txn_heap
MemoryContext txn_context
XLogRecPtr current_restart_decoding_lsn
ReplicationSlotPersistentData data
Definition: slot.h:181
#define InvalidTransactionId
Definition: transam.h:31

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::catchange_txns, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dclist_init(), dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, pairingheap_allocate(), ReorderBufferCleanupSerializedTXNs(), ReorderBufferTXNSizeCompare(), ReorderBuffer::size, SLAB_DEFAULT_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBuffer::toplevel_by_lsn, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBuffer::tup_context, ReorderBuffer::txn_context, ReorderBuffer::txn_heap, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 1077 of file reorderbuffer.c.

1079 {
1080  ReorderBufferTXN *txn;
1081  ReorderBufferTXN *subtxn;
1082  bool new_top;
1083  bool new_sub;
1084 
1085  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1086  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1087 
1088  if (!new_sub)
1089  {
1090  if (rbtxn_is_known_subxact(subtxn))
1091  {
1092  /* already associated, nothing to do */
1093  return;
1094  }
1095  else
1096  {
1097  /*
1098  * We already saw this transaction, but initially added it to the
1099  * list of top-level txns. Now that we know it's not top-level,
1100  * remove it from there.
1101  */
1102  dlist_delete(&subtxn->node);
1103  }
1104  }
1105 
1106  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1107  subtxn->toplevel_xid = xid;
1108  Assert(subtxn->nsubtxns == 0);
1109 
1110  /* set the reference to top-level transaction */
1111  subtxn->toptxn = txn;
1112 
1113  /* add to subtransaction list */
1114  dlist_push_tail(&txn->subtxns, &subtxn->node);
1115  txn->nsubtxns++;
1116 
1117  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1118  ReorderBufferTransferSnapToParent(txn, subtxn);
1119 
1120  /* Verify LSN-ordering invariant */
1121  AssertTXNLsnOrder(rb);
1122 }
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
#define rbtxn_is_known_subxact(txn)
TransactionId toplevel_xid
struct ReorderBufferTXN * toptxn
dlist_head subtxns

References Assert, AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SUBXACT, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, ReorderBufferTXN::toplevel_xid, ReorderBufferTXN::toptxn, and ReorderBufferTXN::txn_flags.

Referenced by LogicalDecodingProcessRecord(), and ReorderBufferCommitChild().

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2740 of file reorderbuffer.c.

2744 {
2745  ReorderBufferTXN *txn;
2746 
2747  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2748  false);
2749 
2750  /* unknown transaction, nothing to replay */
2751  if (txn == NULL)
2752  return;
2753 
2754  ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2755  origin_id, origin_lsn);
2756 }
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)

References InvalidXLogRecPtr, ReorderBufferReplay(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 1197 of file reorderbuffer.c.

1200 {
1201  ReorderBufferTXN *subtxn;
1202 
1203  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1204  InvalidXLogRecPtr, false);
1205 
1206  /*
1207  * No need to do anything if that subtxn didn't contain any changes
1208  */
1209  if (!subtxn)
1210  return;
1211 
1212  subtxn->final_lsn = commit_lsn;
1213  subtxn->end_lsn = end_lsn;
1214 
1215  /*
1216  * Assign this subxact as a child of the toplevel xact (no-op if already
1217  * done.)
1218  */
1219  ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1220 }
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
XLogRecPtr end_lsn

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit(), and DecodePrepare().

◆ ReorderBufferFinishPrepared()

void ReorderBufferFinishPrepared ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
XLogRecPtr  two_phase_at,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn,
char *  gid,
bool  is_commit 
)

Definition at line 2846 of file reorderbuffer.c.

2851 {
2852  ReorderBufferTXN *txn;
2853  XLogRecPtr prepare_end_lsn;
2854  TimestampTz prepare_time;
2855 
2856  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
2857 
2858  /* unknown transaction, nothing to do */
2859  if (txn == NULL)
2860  return;
2861 
2862  /*
2863  * By this time the txn has the prepare record information, remember it to
2864  * be later used for rollback.
2865  */
2866  prepare_end_lsn = txn->end_lsn;
2867  prepare_time = txn->xact_time.prepare_time;
2868 
2869  /* add the gid in the txn */
2870  txn->gid = pstrdup(gid);
2871 
2872  /*
2873  * It is possible that this transaction is not decoded at prepare time
2874  * either because by that time we didn't have a consistent snapshot, or
2875  * two_phase was not enabled, or it was decoded earlier but we have
2876  * restarted. We only need to send the prepare if it was not decoded
2877  * earlier. We don't need to decode the xact for aborts if it is not done
2878  * already.
2879  */
2880  if ((txn->final_lsn < two_phase_at) && is_commit)
2881  {
2882  txn->txn_flags |= RBTXN_PREPARE;
2883 
2884  /*
2885  * The prepare info must have been updated in txn even if we skip
2886  * prepare.
2887  */
2889 
2890  /*
2891  * By this time the txn has the prepare record information and it is
2892  * important to use that so that downstream gets the accurate
2893  * information. If instead, we have passed commit information here
2894  * then downstream can behave as it has already replayed commit
2895  * prepared after the restart.
2896  */
2897  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2898  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2899  }
2900 
2901  txn->final_lsn = commit_lsn;
2902  txn->end_lsn = end_lsn;
2903  txn->xact_time.commit_time = commit_time;
2904  txn->origin_id = origin_id;
2905  txn->origin_lsn = origin_lsn;
2906 
2907  if (is_commit)
2908  rb->commit_prepared(rb, txn, commit_lsn);
2909  else
2910  rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
2911 
2912  /* cleanup: make sure there's no cache pollution */
2914  txn->invalidations);
2915  ReorderBufferCleanupTXN(rb, txn);
2916 }
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1696
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
TimestampTz commit_time
RepOriginId origin_id
XLogRecPtr origin_lsn
TimestampTz prepare_time
ReorderBufferCommitPreparedCB commit_prepared
ReorderBufferRollbackPreparedCB rollback_prepared
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References Assert, ReorderBuffer::commit_prepared, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, pstrdup(), RBTXN_PREPARE, ReorderBufferCleanupTXN(), ReorderBufferExecuteInvalidations(), ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBuffer::rollback_prepared, ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodeAbort(), and DecodeCommit().

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3023 of file reorderbuffer.c.

3024 {
3025  ReorderBufferTXN *txn;
3026 
3027  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3028  false);
3029 
3030  /* unknown, nothing to forget */
3031  if (txn == NULL)
3032  return;
3033 
3034  /* this transaction mustn't be streamed */
3035  Assert(!rbtxn_is_streamed(txn));
3036 
3037  /* cosmetic... */
3038  txn->final_lsn = lsn;
3039 
3040  /*
3041  * Process cache invalidation messages if there are any. Even if we're not
3042  * interested in the transaction's contents, it could have manipulated the
3043  * catalog and we need to update the caches according to that.
3044  */
3045  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3047  txn->invalidations);
3048  else
3049  Assert(txn->ninvalidations == 0);
3050 
3051  /* remove potential on-disk data, and deallocate */
3052  ReorderBufferCleanupTXN(rb, txn);
3053 }
Snapshot base_snapshot

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer rb)

Definition at line 401 of file reorderbuffer.c.

402 {
404 
405  /*
406  * We free separately allocated data by entirely scrapping reorderbuffer's
407  * memory context.
408  */
410 
411  /* Free disk space used by unconsumed reorder buffers */
413 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
tree context
Definition: radixtree.h:1835

References context, ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

◆ ReorderBufferGetCatalogChangesXacts()

TransactionId* ReorderBufferGetCatalogChangesXacts ( ReorderBuffer rb)

Definition at line 3438 of file reorderbuffer.c.

3439 {
3440  dlist_iter iter;
3441  TransactionId *xids = NULL;
3442  size_t xcnt = 0;
3443 
3444  /* Quick return if the list is empty */
3445  if (dclist_count(&rb->catchange_txns) == 0)
3446  return NULL;
3447 
3448  /* Initialize XID array */
3449  xids = (TransactionId *) palloc(sizeof(TransactionId) *
3451  dclist_foreach(iter, &rb->catchange_txns)
3452  {
3454  catchange_node,
3455  iter.cur);
3456 
3458 
3459  xids[xcnt++] = txn->xid;
3460  }
3461 
3462  qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
3463 
3464  Assert(xcnt == dclist_count(&rb->catchange_txns));
3465  return xids;
3466 }
#define dclist_container(type, membername, ptr)
Definition: ilist.h:947
static uint32 dclist_count(const dclist_head *head)
Definition: ilist.h:932
#define dclist_foreach(iter, lhead)
Definition: ilist.h:970
#define qsort(a, b, c, d)
Definition: port.h:447
#define rbtxn_has_catalog_changes(txn)
dlist_node * cur
Definition: ilist.h:179
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:152

References Assert, ReorderBuffer::catchange_txns, dlist_iter::cur, dclist_container, dclist_count(), dclist_foreach, palloc(), qsort, rbtxn_has_catalog_changes, ReorderBufferTXN::xid, and xidComparator().

Referenced by SnapBuildSerialize().

◆ ReorderBufferGetChange()

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer rb)

Definition at line 1022 of file reorderbuffer.c.

1023 {
1024  ReorderBufferTXN *txn;
1025 
1026  AssertTXNLsnOrder(rb);
1027 
1028  if (dlist_is_empty(&rb->toplevel_by_lsn))
1029  return NULL;
1030 
1032 
1035  return txn;
1036 }
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:603
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
XLogRecPtr first_lsn

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, and ReorderBuffer::toplevel_by_lsn.

Referenced by SnapBuildProcessRunningXacts().

◆ ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBuffer rb)

Definition at line 1050 of file reorderbuffer.c.

1051 {
1052  ReorderBufferTXN *txn;
1053 
1054  AssertTXNLsnOrder(rb);
1055 
1057  return InvalidTransactionId;
1058 
1059  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
1061  return txn->base_snapshot->xmin;
1062 }
TransactionId xmin
Definition: snapshot.h:157

References AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, dlist_head_element, dlist_is_empty(), InvalidTransactionId, ReorderBuffer::txns_by_base_snapshot_lsn, and SnapshotData::xmin.

Referenced by SnapBuildProcessRunningXacts().

◆ ReorderBufferGetRelids()

Oid* ReorderBufferGetRelids ( ReorderBuffer rb,
int  nrelids 
)

Definition at line 603 of file reorderbuffer.c.

604 {
605  Oid *relids;
606  Size alloc_len;
607 
608  alloc_len = sizeof(Oid) * nrelids;
609 
610  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
611 
612  return relids;
613 }
size_t Size
Definition: c.h:608
unsigned int Oid
Definition: postgres_ext.h:31

References ReorderBuffer::context, and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

◆ ReorderBufferGetTupleBuf()

HeapTuple ReorderBufferGetTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)

Definition at line 570 of file reorderbuffer.c.

571 {
572  HeapTuple tuple;
573  Size alloc_len;
574 
575  alloc_len = tuple_len + SizeofHeapTupleHeader;
576 
578  HEAPTUPLESIZE + alloc_len);
579  tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
580 
581  return tuple;
582 }
#define HEAPTUPLESIZE
Definition: htup.h:73
HeapTupleData * HeapTuple
Definition: htup.h:71
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
#define SizeofHeapTupleHeader
Definition: htup_details.h:185
HeapTupleHeader t_data
Definition: htup.h:68

References HEAPTUPLESIZE, MemoryContextAlloc(), SizeofHeapTupleHeader, HeapTupleData::t_data, and ReorderBuffer::tup_context.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 3095 of file reorderbuffer.c.

3097 {
3098  bool use_subtxn = IsTransactionOrTransactionBlock();
3099  int i;
3100 
3101  if (use_subtxn)
3102  BeginInternalSubTransaction("replay");
3103 
3104  /*
3105  * Force invalidations to happen outside of a valid transaction - that way
3106  * entries will just be marked as invalid without accessing the catalog.
3107  * That's advantageous because we don't need to setup the full state
3108  * necessary for catalog access.
3109  */
3110  if (use_subtxn)
3112 
3113  for (i = 0; i < ninvalidations; i++)
3114  LocalExecuteInvalidationMessage(&invalidations[i]);
3115 
3116  if (use_subtxn)
3118 }
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:762
int i
Definition: isn.c:72
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4994
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4699
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4801
void AbortCurrentTransaction(void)
Definition: xact.c:3443

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by ReorderBufferAbort(), ReorderBufferForget(), ReorderBufferInvalidate(), and xact_decode().

◆ ReorderBufferInvalidate()

void ReorderBufferInvalidate ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3064 of file reorderbuffer.c.

3065 {
3066  ReorderBufferTXN *txn;
3067 
3068  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3069  false);
3070 
3071  /* unknown, nothing to do */
3072  if (txn == NULL)
3073  return;
3074 
3075  /*
3076  * Process cache invalidation messages if there are any. Even if we're not
3077  * interested in the transaction's contents, it could have manipulated the
3078  * catalog and we need to update the caches according to that.
3079  */
3080  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3082  txn->invalidations);
3083  else
3084  Assert(txn->ninvalidations == 0);
3085 }

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodePrepare().

◆ ReorderBufferPrepare()

void ReorderBufferPrepare ( ReorderBuffer rb,
TransactionId  xid,
char *  gid 
)

Definition at line 2809 of file reorderbuffer.c.

2811 {
2812  ReorderBufferTXN *txn;
2813 
2814  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2815  false);
2816 
2817  /* unknown transaction, nothing to replay */
2818  if (txn == NULL)
2819  return;
2820 
2821  txn->txn_flags |= RBTXN_PREPARE;
2822  txn->gid = pstrdup(gid);
2823 
2824  /* The prepare info must have been updated in txn by now. */
2826 
2827  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2828  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2829 
2830  /*
2831  * We send the prepare for the concurrently aborted xacts so that later
2832  * when rollback prepared is decoded and sent, the downstream should be
2833  * able to rollback such a xact. See comments atop DecodePrepare.
2834  *
2835  * Note, for the concurrent_abort + streaming case a stream_prepare was
2836  * already sent within the ReorderBufferReplay call above.
2837  */
2838  if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
2839  rb->prepare(rb, txn, txn->final_lsn);
2840 }
ReorderBufferPrepareCB prepare

References Assert, ReorderBufferTXN::concurrent_abort, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBuffer::prepare, ReorderBufferTXN::prepare_time, pstrdup(), rbtxn_is_streamed, RBTXN_PREPARE, ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodePrepare().

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3131 of file reorderbuffer.c.

3132 {
3133  /* many records won't have an xid assigned, centralize check here */
3134  if (xid != InvalidTransactionId)
3135  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3136 }

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by heap2_decode(), heap_decode(), LogicalDecodingProcessRecord(), logicalmsg_decode(), standby_decode(), xact_decode(), and xlog_decode().

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change,
bool  toast_insert 
)

Definition at line 788 of file reorderbuffer.c.

790 {
791  ReorderBufferTXN *txn;
792 
793  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
794 
795  /*
796  * While streaming the previous changes we have detected that the
797  * transaction is aborted. So there is no point in collecting further
798  * changes for it.
799  */
800  if (txn->concurrent_abort)
801  {
802  /*
803  * We don't need to update memory accounting for this change as we
804  * have not added it to the queue yet.
805  */
806  ReorderBufferReturnChange(rb, change, false);
807  return;
808  }
809 
810  /*
811  * The changes that are sent downstream are considered streamable. We
812  * remember such transactions so that only those will later be considered
813  * for streaming.
814  */
815  if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
821  {
822  ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
823 
825  }
826 
827  change->lsn = lsn;
828  change->txn = txn;
829 
830  Assert(InvalidXLogRecPtr != lsn);
831  dlist_push_tail(&txn->changes, &change->node);
832  txn->nentries++;
833  txn->nentries_mem++;
834 
835  /* update memory accounting information */
836  ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
837  ReorderBufferChangeSize(change));
838 
839  /* process partial change */
840  ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
841 
842  /* check the memory limits and evict something if needed */
844 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, ReorderBufferTXN *txn, bool addition, Size sz)
dlist_head changes

References ReorderBufferChange::action, Assert, ReorderBufferTXN::changes, ReorderBufferTXN::concurrent_abort, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, rbtxn_get_toptxn, RBTXN_HAS_STREAMABLE_CHANGE, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferCheckMemoryLimit(), ReorderBufferProcessPartialChange(), ReorderBufferReturnChange(), ReorderBufferTXNByXid(), ReorderBufferChange::txn, and ReorderBufferTXN::txn_flags.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddInvalidations(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snap,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 851 of file reorderbuffer.c.

855 {
856  if (transactional)
857  {
858  MemoryContext oldcontext;
859  ReorderBufferChange *change;
860 
862 
863  /*
864  * We don't expect snapshots for transactional changes - we'll use the
865  * snapshot derived later during apply (unless the change gets
866  * skipped).
867  */
868  Assert(!snap);
869 
870  oldcontext = MemoryContextSwitchTo(rb->context);
871 
872  change = ReorderBufferGetChange(rb);
874  change->data.msg.prefix = pstrdup(prefix);
875  change->data.msg.message_size = message_size;
876  change->data.msg.message = palloc(message_size);
877  memcpy(change->data.msg.message, message, message_size);
878 
879  ReorderBufferQueueChange(rb, xid, lsn, change, false);
880 
881  MemoryContextSwitchTo(oldcontext);
882  }
883  else
884  {
885  ReorderBufferTXN *txn = NULL;
886  volatile Snapshot snapshot_now = snap;
887 
888  /* Non-transactional changes require a valid snapshot. */
889  Assert(snapshot_now);
890 
891  if (xid != InvalidTransactionId)
892  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
893 
894  /* setup snapshot to allow catalog access */
895  SetupHistoricSnapshot(snapshot_now, NULL);
896  PG_TRY();
897  {
898  rb->message(rb, txn, lsn, false, prefix, message_size, message);
899 
901  }
902  PG_CATCH();
903  {
905  PG_RE_THROW();
906  }
907  PG_END_TRY();
908  }
909 }
#define PG_RE_THROW()
Definition: elog.h:412
#define PG_TRY(...)
Definition: elog.h:371
#define PG_END_TRY(...)
Definition: elog.h:396
#define PG_CATCH(...)
Definition: elog.h:381
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:1665
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:1649
struct ReorderBufferChange::@108::@111 msg
ReorderBufferMessageCB message

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), and TeardownHistoricSnapshot().

Referenced by logicalmsg_decode().

◆ ReorderBufferRememberPrepareInfo()

bool ReorderBufferRememberPrepareInfo ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  prepare_lsn,
XLogRecPtr  end_lsn,
TimestampTz  prepare_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2762 of file reorderbuffer.c.

2766 {
2767  ReorderBufferTXN *txn;
2768 
2769  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2770 
2771  /* unknown transaction, nothing to do */
2772  if (txn == NULL)
2773  return false;
2774 
2775  /*
2776  * Remember the prepare information to be later used by commit prepared in
2777  * case we skip doing prepare.
2778  */
2779  txn->final_lsn = prepare_lsn;
2780  txn->end_lsn = end_lsn;
2781  txn->xact_time.prepare_time = prepare_time;
2782  txn->origin_id = origin_id;
2783  txn->origin_lsn = origin_lsn;
2784 
2785  return true;
2786 }

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, ReorderBufferTXNByXid(), and ReorderBufferTXN::xact_time.

Referenced by DecodePrepare().

◆ ReorderBufferReturnChange()

void ReorderBufferReturnChange ( ReorderBuffer rb,
ReorderBufferChange change,
bool  upd_mem 
)

Definition at line 500 of file reorderbuffer.c.

502 {
503  /* update memory accounting info */
504  if (upd_mem)
505  ReorderBufferChangeMemoryUpdate(rb, change, NULL, false,
506  ReorderBufferChangeSize(change));
507 
508  /* free contained data */
509  switch (change->action)
510  {
515  if (change->data.tp.newtuple)
516  {
517  ReorderBufferReturnTupleBuf(change->data.tp.newtuple);
518  change->data.tp.newtuple = NULL;
519  }
520 
521  if (change->data.tp.oldtuple)
522  {
523  ReorderBufferReturnTupleBuf(change->data.tp.oldtuple);
524  change->data.tp.oldtuple = NULL;
525  }
526  break;
528  if (change->data.msg.prefix != NULL)
529  pfree(change->data.msg.prefix);
530  change->data.msg.prefix = NULL;
531  if (change->data.msg.message != NULL)
532  pfree(change->data.msg.message);
533  change->data.msg.message = NULL;
534  break;
536  if (change->data.inval.invalidations)
537  pfree(change->data.inval.invalidations);
538  change->data.inval.invalidations = NULL;
539  break;
541  if (change->data.snapshot)
542  {
543  ReorderBufferFreeSnap(rb, change->data.snapshot);
544  change->data.snapshot = NULL;
545  }
546  break;
547  /* no data in addition to the struct itself */
549  if (change->data.truncate.relids != NULL)
550  {
551  ReorderBufferReturnRelids(rb, change->data.truncate.relids);
552  change->data.truncate.relids = NULL;
553  }
554  break;
559  break;
560  }
561 
562  pfree(change);
563 }
void pfree(void *pointer)
Definition: mcxt.c:1521
void ReorderBufferReturnTupleBuf(HeapTuple tuple)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
void ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)
struct ReorderBufferChange::@108::@110 truncate
struct ReorderBufferChange::@108::@109 tp

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::msg, pfree(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferFreeSnap(), ReorderBufferReturnRelids(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, ReorderBufferChange::tp, and ReorderBufferChange::truncate.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferProcessTXN(), ReorderBufferQueueChange(), ReorderBufferResetTXN(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferToastReset(), and ReorderBufferTruncateTXN().

◆ ReorderBufferReturnRelids()

void ReorderBufferReturnRelids ( ReorderBuffer rb,
Oid relids 
)

Definition at line 619 of file reorderbuffer.c.

620 {
621  pfree(relids);
622 }

References pfree().

Referenced by ReorderBufferReturnChange().

◆ ReorderBufferReturnTupleBuf()

void ReorderBufferReturnTupleBuf ( HeapTuple  tuple)

Definition at line 588 of file reorderbuffer.c.

589 {
590  pfree(tuple);
591 }

References pfree().

Referenced by ReorderBufferReturnChange().

◆ ReorderBufferSetBaseSnapshot()

void ReorderBufferSetBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 3162 of file reorderbuffer.c.

3164 {
3165  ReorderBufferTXN *txn;
3166  bool is_new;
3167 
3168  Assert(snap != NULL);
3169 
3170  /*
3171  * Fetch the transaction to operate on. If we know it's a subtransaction,
3172  * operate on its top-level transaction instead.
3173  */
3174  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
3175  if (rbtxn_is_known_subxact(txn))
3176  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3177  NULL, InvalidXLogRecPtr, false);
3178  Assert(txn->base_snapshot == NULL);
3179 
3180  txn->base_snapshot = snap;
3181  txn->base_snapshot_lsn = lsn;
3183 
3184  AssertTXNLsnOrder(rb);
3185 }
XLogRecPtr base_snapshot_lsn
dlist_node base_snapshot_node

References Assert, AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBufferTXN::base_snapshot_node, dlist_push_tail(), InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), ReorderBufferTXN::toplevel_xid, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by SnapBuildCommitTxn(), and SnapBuildProcessChange().

◆ ReorderBufferSetRestartPoint()

void ReorderBufferSetRestartPoint ( ReorderBuffer rb,
XLogRecPtr  ptr 
)

Definition at line 1065 of file reorderbuffer.c.

1066 {
1067  rb->current_restart_decoding_lsn = ptr;
1068 }

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

◆ ReorderBufferSkipPrepare()

void ReorderBufferSkipPrepare ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 2790 of file reorderbuffer.c.

2791 {
2792  ReorderBufferTXN *txn;
2793 
2794  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2795 
2796  /* unknown transaction, nothing to do */
2797  if (txn == NULL)
2798  return;
2799 
2801 }

References InvalidXLogRecPtr, RBTXN_SKIPPED_PREPARE, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by DecodePrepare().

◆ ReorderBufferXidHasBaseSnapshot()

bool ReorderBufferXidHasBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 3490 of file reorderbuffer.c.

3491 {
3492  ReorderBufferTXN *txn;
3493 
3494  txn = ReorderBufferTXNByXid(rb, xid, false,
3495  NULL, InvalidXLogRecPtr, false);
3496 
3497  /* transaction isn't known yet, ergo no snapshot */
3498  if (txn == NULL)
3499  return false;
3500 
3501  /* a known subtxn? operate on top-level txn instead */
3502  if (rbtxn_is_known_subxact(txn))
3503  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3504  NULL, InvalidXLogRecPtr, false);
3505 
3506  return txn->base_snapshot != NULL;
3507 }

References ReorderBufferTXN::base_snapshot, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), and ReorderBufferTXN::toplevel_xid.

Referenced by SnapBuildCommitTxn(), SnapBuildDistributeNewCatalogSnapshot(), and SnapBuildProcessChange().

◆ ReorderBufferXidHasCatalogChanges()

bool ReorderBufferXidHasCatalogChanges ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 3473 of file reorderbuffer.c.

3474 {
3475  ReorderBufferTXN *txn;
3476 
3477  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3478  false);
3479  if (txn == NULL)
3480  return false;
3481 
3482  return rbtxn_has_catalog_changes(txn);
3483 }

References InvalidXLogRecPtr, rbtxn_has_catalog_changes, and ReorderBufferTXNByXid().

Referenced by SnapBuildXidHasCatalogChanges().

◆ ReorderBufferXidSetCatalogChanges()

void ReorderBufferXidSetCatalogChanges ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3400 of file reorderbuffer.c.

3402 {
3403  ReorderBufferTXN *txn;
3404 
3405  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3406 
3407  if (!rbtxn_has_catalog_changes(txn))
3408  {
3411  }
3412 
3413  /*
3414  * Mark top-level transaction as having catalog changes too if one of its
3415  * children has so that the ReorderBufferBuildTupleCidHash can
3416  * conveniently check just top-level transaction and decide whether to
3417  * build the hash table or not.
3418  */
3419  if (rbtxn_is_subtxn(txn))
3420  {
3421  ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
3422 
3423  if (!rbtxn_has_catalog_changes(toptxn))
3424  {
3427  }
3428  }
3429 }
static void dclist_push_tail(dclist_head *head, dlist_node *node)
Definition: ilist.h:709
#define rbtxn_is_subtxn(txn)
dlist_node catchange_node

References ReorderBufferTXN::catchange_node, ReorderBuffer::catchange_txns, dclist_push_tail(), rbtxn_get_toptxn, RBTXN_HAS_CATALOG_CHANGES, rbtxn_has_catalog_changes, rbtxn_is_subtxn, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by SnapBuildProcessNewCid(), and xact_decode().

◆ StartupReorderBuffer()

void StartupReorderBuffer ( void  )

Definition at line 4655 of file reorderbuffer.c.

4656 {
4657  DIR *logical_dir;
4658  struct dirent *logical_de;
4659 
4660  logical_dir = AllocateDir(PG_REPLSLOT_DIR);
4661  while ((logical_de = ReadDir(logical_dir, PG_REPLSLOT_DIR)) != NULL)
4662  {
4663  if (strcmp(logical_de->d_name, ".") == 0 ||
4664  strcmp(logical_de->d_name, "..") == 0)
4665  continue;
4666 
4667  /* if it cannot be a slot, skip the directory */
4668  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
4669  continue;
4670 
4671  /*
4672  * ok, has to be a surviving logical slot, iterate and delete
4673  * everything starting with xid-*
4674  */
4676  }
4677  FreeDir(logical_dir);
4678 }
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2931
int FreeDir(DIR *dir)
Definition: fd.c:2983
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2865
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:252
#define PG_REPLSLOT_DIR
Definition: slot.h:21
Definition: dirent.c:26
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15

References AllocateDir(), dirent::d_name, DEBUG2, FreeDir(), PG_REPLSLOT_DIR, ReadDir(), ReorderBufferCleanupSerializedTXNs(), and ReplicationSlotValidateName().

Referenced by StartupXLOG().

Variable Documentation

◆ debug_logical_replication_streaming

PGDLLIMPORT int debug_logical_replication_streaming
extern

Definition at line 216 of file reorderbuffer.c.

Referenced by pa_send_data(), and ReorderBufferCheckMemoryLimit().

◆ logical_decoding_work_mem

PGDLLIMPORT int logical_decoding_work_mem
extern

Definition at line 212 of file reorderbuffer.c.

Referenced by ReorderBufferCheckMemoryLimit().