PostgreSQL Source Code  git master
reorderbuffer.h File Reference
#include "access/htup_details.h"
#include "lib/ilist.h"
#include "storage/sinval.h"
#include "utils/hsearch.h"
#include "utils/relcache.h"
#include "utils/snapshot.h"
#include "utils/timestamp.h"
Include dependency graph for reorderbuffer.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTupleBuf
 
struct  ReorderBufferChange
 
struct  ReorderBufferTXN
 
struct  ReorderBuffer
 

Macros

#define ReorderBufferTupleBufData(p)    ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))
 
#define RBTXN_HAS_CATALOG_CHANGES   0x0001
 
#define RBTXN_IS_SUBXACT   0x0002
 
#define RBTXN_IS_SERIALIZED   0x0004
 
#define RBTXN_IS_SERIALIZED_CLEAR   0x0008
 
#define RBTXN_IS_STREAMED   0x0010
 
#define RBTXN_HAS_PARTIAL_CHANGE   0x0020
 
#define RBTXN_PREPARE   0x0040
 
#define RBTXN_SKIPPED_PREPARE   0x0080
 
#define RBTXN_HAS_STREAMABLE_CHANGE   0x0100
 
#define rbtxn_has_catalog_changes(txn)
 
#define rbtxn_is_known_subxact(txn)
 
#define rbtxn_is_serialized(txn)
 
#define rbtxn_is_serialized_clear(txn)
 
#define rbtxn_has_partial_change(txn)
 
#define rbtxn_has_streamable_change(txn)
 
#define rbtxn_is_streamed(txn)
 
#define rbtxn_prepared(txn)
 
#define rbtxn_skip_prepared(txn)
 
#define rbtxn_is_toptxn(txn)
 
#define rbtxn_is_subtxn(txn)
 
#define rbtxn_get_toptxn(txn)
 

Typedefs

typedef struct ReorderBufferTupleBuf ReorderBufferTupleBuf
 
typedef enum ReorderBufferChangeType ReorderBufferChangeType
 
typedef struct ReorderBufferChange ReorderBufferChange
 
typedef struct ReorderBufferTXN ReorderBufferTXN
 
typedef struct ReorderBuffer ReorderBuffer
 
typedef void(* ReorderBufferApplyChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
typedef void(* ReorderBufferApplyTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
typedef void(* ReorderBufferBeginCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
typedef void(* ReorderBufferCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
typedef void(* ReorderBufferMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
typedef void(* ReorderBufferBeginPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
typedef void(* ReorderBufferPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
typedef void(* ReorderBufferCommitPreparedCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
typedef void(* ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
 
typedef void(* ReorderBufferStreamStartCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr first_lsn)
 
typedef void(* ReorderBufferStreamStopCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr last_lsn)
 
typedef void(* ReorderBufferStreamAbortCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
 
typedef void(* ReorderBufferStreamPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
typedef void(* ReorderBufferStreamCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
typedef void(* ReorderBufferStreamChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
typedef void(* ReorderBufferStreamMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
typedef void(* ReorderBufferStreamTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
typedef void(* ReorderBufferUpdateProgressTxnCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr lsn)
 

Enumerations

enum  DebugLogicalRepStreamingMode { DEBUG_LOGICAL_REP_STREAMING_BUFFERED , DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE }
 
enum  ReorderBufferChangeType {
  REORDER_BUFFER_CHANGE_INSERT , REORDER_BUFFER_CHANGE_UPDATE , REORDER_BUFFER_CHANGE_DELETE , REORDER_BUFFER_CHANGE_MESSAGE ,
  REORDER_BUFFER_CHANGE_INVALIDATION , REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT , REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID , REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID ,
  REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT , REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM , REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT , REORDER_BUFFER_CHANGE_TRUNCATE
}
 

Functions

ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
ReorderBufferTupleBufReorderBufferGetTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *rb)
 
void ReorderBufferReturnChange (ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
 
OidReorderBufferGetRelids (ReorderBuffer *rb, int nrelids)
 
void ReorderBufferReturnRelids (ReorderBuffer *rb, Oid *relids)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferFinishPrepared (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, TimestampTz abort_time)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferInvalidate (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileLocator locator, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferRememberPrepareInfo (ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferSkipPrepare (ReorderBuffer *rb, TransactionId xid)
 
void ReorderBufferPrepare (ReorderBuffer *rb, TransactionId xid, char *gid)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
TransactionId ReorderBufferGetOldestXmin (ReorderBuffer *rb)
 
TransactionIdReorderBufferGetCatalogChangesXacts (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
void StartupReorderBuffer (void)
 

Variables

PGDLLIMPORT int logical_decoding_work_mem
 
PGDLLIMPORT int debug_logical_replication_streaming
 

Macro Definition Documentation

◆ rbtxn_get_toptxn

#define rbtxn_get_toptxn (   txn)
Value:
( \
rbtxn_is_subtxn(txn) ? (txn)->toptxn : (txn) \
)

Definition at line 265 of file reorderbuffer.h.

◆ RBTXN_HAS_CATALOG_CHANGES

#define RBTXN_HAS_CATALOG_CHANGES   0x0001

Definition at line 180 of file reorderbuffer.h.

◆ rbtxn_has_catalog_changes

#define rbtxn_has_catalog_changes (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_HAS_CATALOG_CHANGES) != 0 \
)
#define RBTXN_HAS_CATALOG_CHANGES

Definition at line 191 of file reorderbuffer.h.

◆ RBTXN_HAS_PARTIAL_CHANGE

#define RBTXN_HAS_PARTIAL_CHANGE   0x0020

Definition at line 185 of file reorderbuffer.h.

◆ rbtxn_has_partial_change

#define rbtxn_has_partial_change (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_HAS_PARTIAL_CHANGE) != 0 \
)
#define RBTXN_HAS_PARTIAL_CHANGE

Definition at line 215 of file reorderbuffer.h.

◆ RBTXN_HAS_STREAMABLE_CHANGE

#define RBTXN_HAS_STREAMABLE_CHANGE   0x0100

Definition at line 188 of file reorderbuffer.h.

◆ rbtxn_has_streamable_change

#define rbtxn_has_streamable_change (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_HAS_STREAMABLE_CHANGE) != 0 \
)
#define RBTXN_HAS_STREAMABLE_CHANGE

Definition at line 221 of file reorderbuffer.h.

◆ rbtxn_is_known_subxact

#define rbtxn_is_known_subxact (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_IS_SUBXACT) != 0 \
)
#define RBTXN_IS_SUBXACT

Definition at line 197 of file reorderbuffer.h.

◆ RBTXN_IS_SERIALIZED

#define RBTXN_IS_SERIALIZED   0x0004

Definition at line 182 of file reorderbuffer.h.

◆ rbtxn_is_serialized

#define rbtxn_is_serialized (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \
)
#define RBTXN_IS_SERIALIZED

Definition at line 203 of file reorderbuffer.h.

◆ RBTXN_IS_SERIALIZED_CLEAR

#define RBTXN_IS_SERIALIZED_CLEAR   0x0008

Definition at line 183 of file reorderbuffer.h.

◆ rbtxn_is_serialized_clear

#define rbtxn_is_serialized_clear (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_IS_SERIALIZED_CLEAR) != 0 \
)
#define RBTXN_IS_SERIALIZED_CLEAR

Definition at line 209 of file reorderbuffer.h.

◆ RBTXN_IS_STREAMED

#define RBTXN_IS_STREAMED   0x0010

Definition at line 184 of file reorderbuffer.h.

◆ rbtxn_is_streamed

#define rbtxn_is_streamed (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \
)
#define RBTXN_IS_STREAMED

Definition at line 235 of file reorderbuffer.h.

◆ rbtxn_is_subtxn

#define rbtxn_is_subtxn (   txn)
Value:
( \
(txn)->toptxn != NULL \
)

Definition at line 259 of file reorderbuffer.h.

◆ RBTXN_IS_SUBXACT

#define RBTXN_IS_SUBXACT   0x0002

Definition at line 181 of file reorderbuffer.h.

◆ rbtxn_is_toptxn

#define rbtxn_is_toptxn (   txn)
Value:
( \
(txn)->toptxn == NULL \
)

Definition at line 253 of file reorderbuffer.h.

◆ RBTXN_PREPARE

#define RBTXN_PREPARE   0x0040

Definition at line 186 of file reorderbuffer.h.

◆ rbtxn_prepared

#define rbtxn_prepared (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_PREPARE) != 0 \
)
#define RBTXN_PREPARE

Definition at line 241 of file reorderbuffer.h.

◆ rbtxn_skip_prepared

#define rbtxn_skip_prepared (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_SKIPPED_PREPARE) != 0 \
)
#define RBTXN_SKIPPED_PREPARE

Definition at line 247 of file reorderbuffer.h.

◆ RBTXN_SKIPPED_PREPARE

#define RBTXN_SKIPPED_PREPARE   0x0080

Definition at line 187 of file reorderbuffer.h.

◆ ReorderBufferTupleBufData

#define ReorderBufferTupleBufData (   p)     ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))

Definition at line 47 of file reorderbuffer.h.

Typedef Documentation

◆ ReorderBuffer

typedef struct ReorderBuffer ReorderBuffer

Definition at line 22 of file reorderbuffer.h.

◆ ReorderBufferApplyChangeCB

typedef void(* ReorderBufferApplyChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)

Definition at line 444 of file reorderbuffer.h.

◆ ReorderBufferApplyTruncateCB

typedef void(* ReorderBufferApplyTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)

Definition at line 450 of file reorderbuffer.h.

◆ ReorderBufferBeginCB

typedef void(* ReorderBufferBeginCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)

Definition at line 457 of file reorderbuffer.h.

◆ ReorderBufferBeginPrepareCB

typedef void(* ReorderBufferBeginPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)

Definition at line 474 of file reorderbuffer.h.

◆ ReorderBufferChange

◆ ReorderBufferChangeType

◆ ReorderBufferCommitCB

typedef void(* ReorderBufferCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

Definition at line 461 of file reorderbuffer.h.

◆ ReorderBufferCommitPreparedCB

typedef void(* ReorderBufferCommitPreparedCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

Definition at line 483 of file reorderbuffer.h.

◆ ReorderBufferMessageCB

typedef void(* ReorderBufferMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)

Definition at line 466 of file reorderbuffer.h.

◆ ReorderBufferPrepareCB

typedef void(* ReorderBufferPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)

Definition at line 478 of file reorderbuffer.h.

◆ ReorderBufferRollbackPreparedCB

typedef void(* ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)

Definition at line 488 of file reorderbuffer.h.

◆ ReorderBufferStreamAbortCB

typedef void(* ReorderBufferStreamAbortCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)

Definition at line 506 of file reorderbuffer.h.

◆ ReorderBufferStreamChangeCB

typedef void(* ReorderBufferStreamChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)

Definition at line 524 of file reorderbuffer.h.

◆ ReorderBufferStreamCommitCB

typedef void(* ReorderBufferStreamCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

Definition at line 518 of file reorderbuffer.h.

◆ ReorderBufferStreamMessageCB

typedef void(* ReorderBufferStreamMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)

Definition at line 531 of file reorderbuffer.h.

◆ ReorderBufferStreamPrepareCB

typedef void(* ReorderBufferStreamPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)

Definition at line 512 of file reorderbuffer.h.

◆ ReorderBufferStreamStartCB

typedef void(* ReorderBufferStreamStartCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr first_lsn)

Definition at line 494 of file reorderbuffer.h.

◆ ReorderBufferStreamStopCB

typedef void(* ReorderBufferStreamStopCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr last_lsn)

Definition at line 500 of file reorderbuffer.h.

◆ ReorderBufferStreamTruncateCB

typedef void(* ReorderBufferStreamTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)

Definition at line 540 of file reorderbuffer.h.

◆ ReorderBufferTupleBuf

◆ ReorderBufferTXN

◆ ReorderBufferUpdateProgressTxnCB

typedef void(* ReorderBufferUpdateProgressTxnCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr lsn)

Definition at line 548 of file reorderbuffer.h.

Enumeration Type Documentation

◆ DebugLogicalRepStreamingMode

Enumerator
DEBUG_LOGICAL_REP_STREAMING_BUFFERED 
DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE 

Definition at line 25 of file reorderbuffer.h.

26 {
DebugLogicalRepStreamingMode
Definition: reorderbuffer.h:26
@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
Definition: reorderbuffer.h:28
@ DEBUG_LOGICAL_REP_STREAMING_BUFFERED
Definition: reorderbuffer.h:27

◆ ReorderBufferChangeType

Enumerator
REORDER_BUFFER_CHANGE_INSERT 
REORDER_BUFFER_CHANGE_UPDATE 
REORDER_BUFFER_CHANGE_DELETE 
REORDER_BUFFER_CHANGE_MESSAGE 
REORDER_BUFFER_CHANGE_INVALIDATION 
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT 
REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID 
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT 
REORDER_BUFFER_CHANGE_TRUNCATE 

Definition at line 63 of file reorderbuffer.h.

64 {
ReorderBufferChangeType
Definition: reorderbuffer.h:64
@ REORDER_BUFFER_CHANGE_INVALIDATION
Definition: reorderbuffer.h:69
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
Definition: reorderbuffer.h:74
@ REORDER_BUFFER_CHANGE_INSERT
Definition: reorderbuffer.h:65
@ REORDER_BUFFER_CHANGE_MESSAGE
Definition: reorderbuffer.h:68
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT
Definition: reorderbuffer.h:75
@ REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID
Definition: reorderbuffer.h:71
@ REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
Definition: reorderbuffer.h:72
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT
Definition: reorderbuffer.h:73
@ REORDER_BUFFER_CHANGE_TRUNCATE
Definition: reorderbuffer.h:76
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:67
@ REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT
Definition: reorderbuffer.h:70
@ REORDER_BUFFER_CHANGE_UPDATE
Definition: reorderbuffer.h:66

Function Documentation

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
TimestampTz  abort_time 
)

Definition at line 2891 of file reorderbuffer.c.

2893 {
2894  ReorderBufferTXN *txn;
2895 
2896  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2897  false);
2898 
2899  /* unknown, nothing to remove */
2900  if (txn == NULL)
2901  return;
2902 
2903  txn->xact_time.abort_time = abort_time;
2904 
2905  /* For streamed transactions notify the remote node about the abort. */
2906  if (rbtxn_is_streamed(txn))
2907  {
2908  rb->stream_abort(rb, txn, lsn);
2909 
2910  /*
2911  * We might have decoded changes for this transaction that could load
2912  * the cache as per the current transaction's view (consider DDL's
2913  * happened in this transaction). We don't want the decoding of future
2914  * transactions to use those cache entries so execute invalidations.
2915  */
2916  if (txn->ninvalidations > 0)
2918  txn->invalidations);
2919  }
2920 
2921  /* cosmetic... */
2922  txn->final_lsn = lsn;
2923 
2924  /* remove potential on-disk data, and deallocate */
2925  ReorderBufferCleanupTXN(rb, txn);
2926 }
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_streamed(txn)
SharedInvalidationMessage * invalidations
TimestampTz abort_time
XLogRecPtr final_lsn
union ReorderBufferTXN::@106 xact_time
ReorderBufferStreamAbortCB stream_abort
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References ReorderBufferTXN::abort_time, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), ReorderBuffer::stream_abort, and ReorderBufferTXN::xact_time.

Referenced by DecodeAbort().

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)

Definition at line 2936 of file reorderbuffer.c.

2937 {
2938  dlist_mutable_iter it;
2939 
2940  /*
2941  * Iterate through all (potential) toplevel TXNs and abort all that are
2942  * older than what possibly can be running. Once we've found the first
2943  * that is alive we stop, there might be some that acquired an xid earlier
2944  * but started writing later, but it's unlikely and they will be cleaned
2945  * up in a later call to this function.
2946  */
2948  {
2949  ReorderBufferTXN *txn;
2950 
2951  txn = dlist_container(ReorderBufferTXN, node, it.cur);
2952 
2953  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2954  {
2955  elog(DEBUG2, "aborting old transaction %u", txn->xid);
2956 
2957  /* Notify the remote node about the crash/immediate restart. */
2958  if (rbtxn_is_streamed(txn))
2959  rb->stream_abort(rb, txn, InvalidXLogRecPtr);
2960 
2961  /* remove potential on-disk data, and deallocate this tx */
2962  ReorderBufferCleanupTXN(rb, txn);
2963  }
2964  else
2965  return;
2966  }
2967 }
#define DEBUG2
Definition: elog.h:29
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:640
#define dlist_container(type, membername, ptr)
Definition: ilist.h:593
TransactionId xid
dlist_head toplevel_by_lsn
dlist_node * cur
Definition: ilist.h:200
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280

References dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, elog(), InvalidXLogRecPtr, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBuffer::stream_abort, ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), and ReorderBufferTXN::xid.

Referenced by standby_decode().

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 3269 of file reorderbuffer.c.

3272 {
3273  ReorderBufferTXN *txn;
3274  MemoryContext oldcontext;
3275  ReorderBufferChange *change;
3276 
3277  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3278 
3279  oldcontext = MemoryContextSwitchTo(rb->context);
3280 
3281  /*
3282  * Collect all the invalidations under the top transaction, if available,
3283  * so that we can execute them all together. See comments atop this
3284  * function.
3285  */
3286  txn = rbtxn_get_toptxn(txn);
3287 
3288  Assert(nmsgs > 0);
3289 
3290  /* Accumulate invalidations. */
3291  if (txn->ninvalidations == 0)
3292  {
3293  txn->ninvalidations = nmsgs;
3295  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3296  memcpy(txn->invalidations, msgs,
3297  sizeof(SharedInvalidationMessage) * nmsgs);
3298  }
3299  else
3300  {
3303  (txn->ninvalidations + nmsgs));
3304 
3305  memcpy(txn->invalidations + txn->ninvalidations, msgs,
3306  nmsgs * sizeof(SharedInvalidationMessage));
3307  txn->ninvalidations += nmsgs;
3308  }
3309 
3310  change = ReorderBufferGetChange(rb);
3312  change->data.inval.ninvalidations = nmsgs;
3313  change->data.inval.invalidations = (SharedInvalidationMessage *)
3314  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3315  memcpy(change->data.inval.invalidations, msgs,
3316  sizeof(SharedInvalidationMessage) * nmsgs);
3317 
3318  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3319 
3320  MemoryContextSwitchTo(oldcontext);
3321 }
Assert(fmt[strlen(fmt) - 1] !='\n')
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1476
void * palloc(Size size)
Definition: mcxt.c:1226
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:138
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
#define rbtxn_get_toptxn(txn)
ReorderBufferChangeType action
Definition: reorderbuffer.h:94
union ReorderBufferChange::@100 data
struct ReorderBufferChange::@100::@105 inval
MemoryContext context

References ReorderBufferChange::action, Assert(), ReorderBuffer::context, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferTXN::invalidations, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, palloc(), rbtxn_get_toptxn, REORDER_BUFFER_CHANGE_INVALIDATION, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), and repalloc().

Referenced by xact_decode().

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileLocator  locator,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 3232 of file reorderbuffer.c.

3236 {
3238  ReorderBufferTXN *txn;
3239 
3240  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3241 
3242  change->data.tuplecid.locator = locator;
3243  change->data.tuplecid.tid = tid;
3244  change->data.tuplecid.cmin = cmin;
3245  change->data.tuplecid.cmax = cmax;
3246  change->data.tuplecid.combocid = combocid;
3247  change->lsn = lsn;
3248  change->txn = txn;
3250 
3251  dlist_push_tail(&txn->tuplecids, &change->node);
3252  txn->ntuplecids++;
3253 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:364
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:97
struct ReorderBufferChange::@100::@104 tuplecid
dlist_head tuplecids

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, and ReorderBufferChange::txn.

Referenced by SnapBuildProcessNewCid().

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

◆ ReorderBufferAllocate()

ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 304 of file reorderbuffer.c.

305 {
306  ReorderBuffer *buffer;
307  HASHCTL hash_ctl;
308  MemoryContext new_ctx;
309 
310  Assert(MyReplicationSlot != NULL);
311 
312  /* allocate memory in own context, to have better accountability */
314  "ReorderBuffer",
316 
317  buffer =
318  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
319 
320  memset(&hash_ctl, 0, sizeof(hash_ctl));
321 
322  buffer->context = new_ctx;
323 
324  buffer->change_context = SlabContextCreate(new_ctx,
325  "Change",
327  sizeof(ReorderBufferChange));
328 
329  buffer->txn_context = SlabContextCreate(new_ctx,
330  "TXN",
332  sizeof(ReorderBufferTXN));
333 
334  /*
335  * XXX the allocation sizes used below pre-date generation context's block
336  * growing code. These values should likely be benchmarked and set to
337  * more suitable values.
338  */
339  buffer->tup_context = GenerationContextCreate(new_ctx,
340  "Tuples",
344 
345  hash_ctl.keysize = sizeof(TransactionId);
346  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
347  hash_ctl.hcxt = buffer->context;
348 
349  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
351 
353  buffer->by_txn_last_txn = NULL;
354 
355  buffer->outbuf = NULL;
356  buffer->outbufsize = 0;
357  buffer->size = 0;
358 
359  buffer->spillTxns = 0;
360  buffer->spillCount = 0;
361  buffer->spillBytes = 0;
362  buffer->streamTxns = 0;
363  buffer->streamCount = 0;
364  buffer->streamBytes = 0;
365  buffer->totalTxns = 0;
366  buffer->totalBytes = 0;
367 
369 
370  dlist_init(&buffer->toplevel_by_lsn);
372  dclist_init(&buffer->catchange_txns);
373 
374  /*
375  * Ensure there's no stale data from prior uses of this slot, in case some
376  * prior exit avoided calling ReorderBufferFree. Failure to do this can
377  * produce duplicated txns, and it's very cheap if there's nothing there.
378  */
380 
381  return buffer;
382 }
#define NameStr(name)
Definition: c.h:735
uint32 TransactionId
Definition: c.h:641
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:350
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: generation.c:157
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
static void dclist_init(dclist_head *head)
Definition: ilist.h:671
MemoryContext CurrentMemoryContext
Definition: mcxt.c:135
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1021
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:182
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:183
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:322
ReplicationSlot * MyReplicationSlot
Definition: slot.c:99
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
MemoryContext hcxt
Definition: hsearch.h:86
dlist_head txns_by_base_snapshot_lsn
dclist_head catchange_txns
MemoryContext change_context
ReorderBufferTXN * by_txn_last_txn
TransactionId by_txn_last_xid
MemoryContext tup_context
MemoryContext txn_context
XLogRecPtr current_restart_decoding_lsn
ReplicationSlotPersistentData data
Definition: slot.h:162
#define InvalidTransactionId
Definition: transam.h:31

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert(), ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::catchange_txns, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dclist_init(), dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, ReorderBufferCleanupSerializedTXNs(), ReorderBuffer::size, SLAB_DEFAULT_BLOCK_SIZE, SLAB_LARGE_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBuffer::toplevel_by_lsn, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBuffer::tup_context, ReorderBuffer::txn_context, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 1064 of file reorderbuffer.c.

1066 {
1067  ReorderBufferTXN *txn;
1068  ReorderBufferTXN *subtxn;
1069  bool new_top;
1070  bool new_sub;
1071 
1072  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1073  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1074 
1075  if (!new_sub)
1076  {
1077  if (rbtxn_is_known_subxact(subtxn))
1078  {
1079  /* already associated, nothing to do */
1080  return;
1081  }
1082  else
1083  {
1084  /*
1085  * We already saw this transaction, but initially added it to the
1086  * list of top-level txns. Now that we know it's not top-level,
1087  * remove it from there.
1088  */
1089  dlist_delete(&subtxn->node);
1090  }
1091  }
1092 
1093  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1094  subtxn->toplevel_xid = xid;
1095  Assert(subtxn->nsubtxns == 0);
1096 
1097  /* set the reference to top-level transaction */
1098  subtxn->toptxn = txn;
1099 
1100  /* add to subtransaction list */
1101  dlist_push_tail(&txn->subtxns, &subtxn->node);
1102  txn->nsubtxns++;
1103 
1104  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1105  ReorderBufferTransferSnapToParent(txn, subtxn);
1106 
1107  /* Verify LSN-ordering invariant */
1108  AssertTXNLsnOrder(rb);
1109 }
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
#define rbtxn_is_known_subxact(txn)
TransactionId toplevel_xid
struct ReorderBufferTXN * toptxn
dlist_head subtxns

References Assert(), AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SUBXACT, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, ReorderBufferTXN::toplevel_xid, ReorderBufferTXN::toptxn, and ReorderBufferTXN::txn_flags.

Referenced by LogicalDecodingProcessRecord(), and ReorderBufferCommitChild().

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2700 of file reorderbuffer.c.

2704 {
2705  ReorderBufferTXN *txn;
2706 
2707  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2708  false);
2709 
2710  /* unknown transaction, nothing to replay */
2711  if (txn == NULL)
2712  return;
2713 
2714  ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2715  origin_id, origin_lsn);
2716 }
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)

References InvalidXLogRecPtr, ReorderBufferReplay(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 1184 of file reorderbuffer.c.

1187 {
1188  ReorderBufferTXN *subtxn;
1189 
1190  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1191  InvalidXLogRecPtr, false);
1192 
1193  /*
1194  * No need to do anything if that subtxn didn't contain any changes
1195  */
1196  if (!subtxn)
1197  return;
1198 
1199  subtxn->final_lsn = commit_lsn;
1200  subtxn->end_lsn = end_lsn;
1201 
1202  /*
1203  * Assign this subxact as a child of the toplevel xact (no-op if already
1204  * done.)
1205  */
1206  ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1207 }
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
XLogRecPtr end_lsn

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit(), and DecodePrepare().

◆ ReorderBufferFinishPrepared()

void ReorderBufferFinishPrepared ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
XLogRecPtr  two_phase_at,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn,
char *  gid,
bool  is_commit 
)

Definition at line 2806 of file reorderbuffer.c.

2811 {
2812  ReorderBufferTXN *txn;
2813  XLogRecPtr prepare_end_lsn;
2814  TimestampTz prepare_time;
2815 
2816  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
2817 
2818  /* unknown transaction, nothing to do */
2819  if (txn == NULL)
2820  return;
2821 
2822  /*
2823  * By this time the txn has the prepare record information, remember it to
2824  * be later used for rollback.
2825  */
2826  prepare_end_lsn = txn->end_lsn;
2827  prepare_time = txn->xact_time.prepare_time;
2828 
2829  /* add the gid in the txn */
2830  txn->gid = pstrdup(gid);
2831 
2832  /*
2833  * It is possible that this transaction is not decoded at prepare time
2834  * either because by that time we didn't have a consistent snapshot, or
2835  * two_phase was not enabled, or it was decoded earlier but we have
2836  * restarted. We only need to send the prepare if it was not decoded
2837  * earlier. We don't need to decode the xact for aborts if it is not done
2838  * already.
2839  */
2840  if ((txn->final_lsn < two_phase_at) && is_commit)
2841  {
2842  txn->txn_flags |= RBTXN_PREPARE;
2843 
2844  /*
2845  * The prepare info must have been updated in txn even if we skip
2846  * prepare.
2847  */
2849 
2850  /*
2851  * By this time the txn has the prepare record information and it is
2852  * important to use that so that downstream gets the accurate
2853  * information. If instead, we have passed commit information here
2854  * then downstream can behave as it has already replayed commit
2855  * prepared after the restart.
2856  */
2857  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2858  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2859  }
2860 
2861  txn->final_lsn = commit_lsn;
2862  txn->end_lsn = end_lsn;
2863  txn->xact_time.commit_time = commit_time;
2864  txn->origin_id = origin_id;
2865  txn->origin_lsn = origin_lsn;
2866 
2867  if (is_commit)
2868  rb->commit_prepared(rb, txn, commit_lsn);
2869  else
2870  rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
2871 
2872  /* cleanup: make sure there's no cache pollution */
2874  txn->invalidations);
2875  ReorderBufferCleanupTXN(rb, txn);
2876 }
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1644
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
TimestampTz commit_time
RepOriginId origin_id
XLogRecPtr origin_lsn
TimestampTz prepare_time
ReorderBufferCommitPreparedCB commit_prepared
ReorderBufferRollbackPreparedCB rollback_prepared
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References Assert(), ReorderBuffer::commit_prepared, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, pstrdup(), RBTXN_PREPARE, ReorderBufferCleanupTXN(), ReorderBufferExecuteInvalidations(), ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBuffer::rollback_prepared, ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodeAbort(), and DecodeCommit().

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2983 of file reorderbuffer.c.

2984 {
2985  ReorderBufferTXN *txn;
2986 
2987  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2988  false);
2989 
2990  /* unknown, nothing to forget */
2991  if (txn == NULL)
2992  return;
2993 
2994  /* this transaction mustn't be streamed */
2995  Assert(!rbtxn_is_streamed(txn));
2996 
2997  /* cosmetic... */
2998  txn->final_lsn = lsn;
2999 
3000  /*
3001  * Process cache invalidation messages if there are any. Even if we're not
3002  * interested in the transaction's contents, it could have manipulated the
3003  * catalog and we need to update the caches according to that.
3004  */
3005  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3007  txn->invalidations);
3008  else
3009  Assert(txn->ninvalidations == 0);
3010 
3011  /* remove potential on-disk data, and deallocate */
3012  ReorderBufferCleanupTXN(rb, txn);
3013 }
Snapshot base_snapshot

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer rb)

Definition at line 388 of file reorderbuffer.c.

389 {
390  MemoryContext context = rb->context;
391 
392  /*
393  * We free separately allocated data by entirely scrapping reorderbuffer's
394  * memory context.
395  */
396  MemoryContextDelete(context);
397 
398  /* Free disk space used by unconsumed reorder buffers */
400 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:403

References ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

◆ ReorderBufferGetCatalogChangesXacts()

TransactionId* ReorderBufferGetCatalogChangesXacts ( ReorderBuffer rb)

Definition at line 3378 of file reorderbuffer.c.

3379 {
3380  dlist_iter iter;
3381  TransactionId *xids = NULL;
3382  size_t xcnt = 0;
3383 
3384  /* Quick return if the list is empty */
3385  if (dclist_count(&rb->catchange_txns) == 0)
3386  return NULL;
3387 
3388  /* Initialize XID array */
3389  xids = (TransactionId *) palloc(sizeof(TransactionId) *
3391  dclist_foreach(iter, &rb->catchange_txns)
3392  {
3394  catchange_node,
3395  iter.cur);
3396 
3398 
3399  xids[xcnt++] = txn->xid;
3400  }
3401 
3402  qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
3403 
3404  Assert(xcnt == dclist_count(&rb->catchange_txns));
3405  return xids;
3406 }
#define dclist_container(type, membername, ptr)
Definition: ilist.h:947
static uint32 dclist_count(const dclist_head *head)
Definition: ilist.h:932
#define dclist_foreach(iter, lhead)
Definition: ilist.h:970
#define qsort(a, b, c, d)
Definition: port.h:445
#define rbtxn_has_catalog_changes(txn)
dlist_node * cur
Definition: ilist.h:179
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:138

References Assert(), ReorderBuffer::catchange_txns, dlist_iter::cur, dclist_container, dclist_count(), dclist_foreach, palloc(), qsort, rbtxn_has_catalog_changes, ReorderBufferTXN::xid, and xidComparator().

Referenced by SnapBuildSerialize().

◆ ReorderBufferGetChange()

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer rb)

Definition at line 1009 of file reorderbuffer.c.

1010 {
1011  ReorderBufferTXN *txn;
1012 
1013  AssertTXNLsnOrder(rb);
1014 
1015  if (dlist_is_empty(&rb->toplevel_by_lsn))
1016  return NULL;
1017 
1019 
1022  return txn;
1023 }
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:603
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
XLogRecPtr first_lsn

References Assert(), AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, and ReorderBuffer::toplevel_by_lsn.

Referenced by SnapBuildProcessRunningXacts().

◆ ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBuffer rb)

Definition at line 1037 of file reorderbuffer.c.

1038 {
1039  ReorderBufferTXN *txn;
1040 
1041  AssertTXNLsnOrder(rb);
1042 
1044  return InvalidTransactionId;
1045 
1046  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
1048  return txn->base_snapshot->xmin;
1049 }
TransactionId xmin
Definition: snapshot.h:157

References AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, dlist_head_element, dlist_is_empty(), InvalidTransactionId, ReorderBuffer::txns_by_base_snapshot_lsn, and SnapshotData::xmin.

Referenced by SnapBuildProcessRunningXacts().

◆ ReorderBufferGetRelids()

Oid* ReorderBufferGetRelids ( ReorderBuffer rb,
int  nrelids 
)

Definition at line 590 of file reorderbuffer.c.

591 {
592  Oid *relids;
593  Size alloc_len;
594 
595  alloc_len = sizeof(Oid) * nrelids;
596 
597  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
598 
599  return relids;
600 }
size_t Size
Definition: c.h:594
unsigned int Oid
Definition: postgres_ext.h:31

References ReorderBuffer::context, and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

◆ ReorderBufferGetTupleBuf()

ReorderBufferTupleBuf* ReorderBufferGetTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)

Definition at line 554 of file reorderbuffer.c.

555 {
556  ReorderBufferTupleBuf *tuple;
557  Size alloc_len;
558 
559  alloc_len = tuple_len + SizeofHeapTupleHeader;
560 
561  tuple = (ReorderBufferTupleBuf *)
563  sizeof(ReorderBufferTupleBuf) +
564  MAXIMUM_ALIGNOF + alloc_len);
565  tuple->alloc_tuple_size = alloc_len;
566  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
567 
568  return tuple;
569 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:185
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:47
HeapTupleHeader t_data
Definition: htup.h:68
HeapTupleData tuple
Definition: reorderbuffer.h:38

References ReorderBufferTupleBuf::alloc_tuple_size, MemoryContextAlloc(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, HeapTupleData::t_data, ReorderBuffer::tup_context, and ReorderBufferTupleBuf::tuple.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 3055 of file reorderbuffer.c.

3057 {
3058  bool use_subtxn = IsTransactionOrTransactionBlock();
3059  int i;
3060 
3061  if (use_subtxn)
3062  BeginInternalSubTransaction("replay");
3063 
3064  /*
3065  * Force invalidations to happen outside of a valid transaction - that way
3066  * entries will just be marked as invalid without accessing the catalog.
3067  * That's advantageous because we don't need to setup the full state
3068  * necessary for catalog access.
3069  */
3070  if (use_subtxn)
3072 
3073  for (i = 0; i < ninvalidations; i++)
3074  LocalExecuteInvalidationMessage(&invalidations[i]);
3075 
3076  if (use_subtxn)
3078 }
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:615
int i
Definition: isn.c:73
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4834
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4540
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4645
void AbortCurrentTransaction(void)
Definition: xact.c:3305

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by ReorderBufferAbort(), ReorderBufferForget(), ReorderBufferInvalidate(), and xact_decode().

◆ ReorderBufferInvalidate()

void ReorderBufferInvalidate ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3024 of file reorderbuffer.c.

3025 {
3026  ReorderBufferTXN *txn;
3027 
3028  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3029  false);
3030 
3031  /* unknown, nothing to do */
3032  if (txn == NULL)
3033  return;
3034 
3035  /*
3036  * Process cache invalidation messages if there are any. Even if we're not
3037  * interested in the transaction's contents, it could have manipulated the
3038  * catalog and we need to update the caches according to that.
3039  */
3040  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3042  txn->invalidations);
3043  else
3044  Assert(txn->ninvalidations == 0);
3045 }

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodePrepare().

◆ ReorderBufferPrepare()

void ReorderBufferPrepare ( ReorderBuffer rb,
TransactionId  xid,
char *  gid 
)

Definition at line 2769 of file reorderbuffer.c.

2771 {
2772  ReorderBufferTXN *txn;
2773 
2774  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2775  false);
2776 
2777  /* unknown transaction, nothing to replay */
2778  if (txn == NULL)
2779  return;
2780 
2781  txn->txn_flags |= RBTXN_PREPARE;
2782  txn->gid = pstrdup(gid);
2783 
2784  /* The prepare info must have been updated in txn by now. */
2786 
2787  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2788  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2789 
2790  /*
2791  * We send the prepare for the concurrently aborted xacts so that later
2792  * when rollback prepared is decoded and sent, the downstream should be
2793  * able to rollback such a xact. See comments atop DecodePrepare.
2794  *
2795  * Note, for the concurrent_abort + streaming case a stream_prepare was
2796  * already sent within the ReorderBufferReplay call above.
2797  */
2798  if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
2799  rb->prepare(rb, txn, txn->final_lsn);
2800 }
ReorderBufferPrepareCB prepare

References Assert(), ReorderBufferTXN::concurrent_abort, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBuffer::prepare, ReorderBufferTXN::prepare_time, pstrdup(), rbtxn_is_streamed, RBTXN_PREPARE, ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodePrepare().

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3091 of file reorderbuffer.c.

3092 {
3093  /* many records won't have an xid assigned, centralize check here */
3094  if (xid != InvalidTransactionId)
3095  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3096 }

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by heap2_decode(), heap_decode(), LogicalDecodingProcessRecord(), logicalmsg_decode(), standby_decode(), xact_decode(), and xlog_decode().

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change,
bool  toast_insert 
)

Definition at line 775 of file reorderbuffer.c.

777 {
778  ReorderBufferTXN *txn;
779 
780  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
781 
782  /*
783  * While streaming the previous changes we have detected that the
784  * transaction is aborted. So there is no point in collecting further
785  * changes for it.
786  */
787  if (txn->concurrent_abort)
788  {
789  /*
790  * We don't need to update memory accounting for this change as we
791  * have not added it to the queue yet.
792  */
793  ReorderBufferReturnChange(rb, change, false);
794  return;
795  }
796 
797  /*
798  * The changes that are sent downstream are considered streamable. We
799  * remember such transactions so that only those will later be considered
800  * for streaming.
801  */
802  if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
808  {
809  ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
810 
812  }
813 
814  change->lsn = lsn;
815  change->txn = txn;
816 
817  Assert(InvalidXLogRecPtr != lsn);
818  dlist_push_tail(&txn->changes, &change->node);
819  txn->nentries++;
820  txn->nentries_mem++;
821 
822  /* update memory accounting information */
823  ReorderBufferChangeMemoryUpdate(rb, change, true,
824  ReorderBufferChangeSize(change));
825 
826  /* process partial change */
827  ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
828 
829  /* check the memory limits and evict something if needed */
831 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition, Size sz)
dlist_head changes

References ReorderBufferChange::action, Assert(), ReorderBufferTXN::changes, ReorderBufferTXN::concurrent_abort, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, rbtxn_get_toptxn, RBTXN_HAS_STREAMABLE_CHANGE, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferCheckMemoryLimit(), ReorderBufferProcessPartialChange(), ReorderBufferReturnChange(), ReorderBufferTXNByXid(), ReorderBufferChange::txn, and ReorderBufferTXN::txn_flags.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddInvalidations(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snap,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 838 of file reorderbuffer.c.

842 {
843  if (transactional)
844  {
845  MemoryContext oldcontext;
846  ReorderBufferChange *change;
847 
849 
850  /*
851  * We don't expect snapshots for transactional changes - we'll use the
852  * snapshot derived later during apply (unless the change gets
853  * skipped).
854  */
855  Assert(!snap);
856 
857  oldcontext = MemoryContextSwitchTo(rb->context);
858 
859  change = ReorderBufferGetChange(rb);
861  change->data.msg.prefix = pstrdup(prefix);
862  change->data.msg.message_size = message_size;
863  change->data.msg.message = palloc(message_size);
864  memcpy(change->data.msg.message, message, message_size);
865 
866  ReorderBufferQueueChange(rb, xid, lsn, change, false);
867 
868  MemoryContextSwitchTo(oldcontext);
869  }
870  else
871  {
872  ReorderBufferTXN *txn = NULL;
873  volatile Snapshot snapshot_now = snap;
874 
875  /* Non-transactional changes require a valid snapshot. */
876  Assert(snapshot_now);
877 
878  if (xid != InvalidTransactionId)
879  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
880 
881  /* setup snapshot to allow catalog access */
882  SetupHistoricSnapshot(snapshot_now, NULL);
883  PG_TRY();
884  {
885  rb->message(rb, txn, lsn, false, prefix, message_size, message);
886 
888  }
889  PG_CATCH();
890  {
892  PG_RE_THROW();
893  }
894  PG_END_TRY();
895  }
896 }
#define PG_RE_THROW()
Definition: elog.h:411
#define PG_TRY(...)
Definition: elog.h:370
#define PG_END_TRY(...)
Definition: elog.h:395
#define PG_CATCH(...)
Definition: elog.h:380
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:1640
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:1624
struct ReorderBufferChange::@100::@103 msg
ReorderBufferMessageCB message

References ReorderBufferChange::action, Assert(), ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), and TeardownHistoricSnapshot().

Referenced by logicalmsg_decode().

◆ ReorderBufferRememberPrepareInfo()

bool ReorderBufferRememberPrepareInfo ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  prepare_lsn,
XLogRecPtr  end_lsn,
TimestampTz  prepare_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2722 of file reorderbuffer.c.

2726 {
2727  ReorderBufferTXN *txn;
2728 
2729  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2730 
2731  /* unknown transaction, nothing to do */
2732  if (txn == NULL)
2733  return false;
2734 
2735  /*
2736  * Remember the prepare information to be later used by commit prepared in
2737  * case we skip doing prepare.
2738  */
2739  txn->final_lsn = prepare_lsn;
2740  txn->end_lsn = end_lsn;
2741  txn->xact_time.prepare_time = prepare_time;
2742  txn->origin_id = origin_id;
2743  txn->origin_lsn = origin_lsn;
2744 
2745  return true;
2746 }

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, ReorderBufferTXNByXid(), and ReorderBufferTXN::xact_time.

Referenced by DecodePrepare().

◆ ReorderBufferReturnChange()

void ReorderBufferReturnChange ( ReorderBuffer rb,
ReorderBufferChange change,
bool  upd_mem 
)

Definition at line 484 of file reorderbuffer.c.

486 {
487  /* update memory accounting info */
488  if (upd_mem)
489  ReorderBufferChangeMemoryUpdate(rb, change, false,
490  ReorderBufferChangeSize(change));
491 
492  /* free contained data */
493  switch (change->action)
494  {
499  if (change->data.tp.newtuple)
500  {
501  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
502  change->data.tp.newtuple = NULL;
503  }
504 
505  if (change->data.tp.oldtuple)
506  {
507  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
508  change->data.tp.oldtuple = NULL;
509  }
510  break;
512  if (change->data.msg.prefix != NULL)
513  pfree(change->data.msg.prefix);
514  change->data.msg.prefix = NULL;
515  if (change->data.msg.message != NULL)
516  pfree(change->data.msg.message);
517  change->data.msg.message = NULL;
518  break;
520  if (change->data.inval.invalidations)
521  pfree(change->data.inval.invalidations);
522  change->data.inval.invalidations = NULL;
523  break;
525  if (change->data.snapshot)
526  {
527  ReorderBufferFreeSnap(rb, change->data.snapshot);
528  change->data.snapshot = NULL;
529  }
530  break;
531  /* no data in addition to the struct itself */
533  if (change->data.truncate.relids != NULL)
534  {
535  ReorderBufferReturnRelids(rb, change->data.truncate.relids);
536  change->data.truncate.relids = NULL;
537  }
538  break;
543  break;
544  }
545 
546  pfree(change);
547 }
void pfree(void *pointer)
Definition: mcxt.c:1456
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
void ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)
struct ReorderBufferChange::@100::@101 tp
struct ReorderBufferChange::@100::@102 truncate

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::msg, pfree(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferFreeSnap(), ReorderBufferReturnRelids(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, ReorderBufferChange::tp, and ReorderBufferChange::truncate.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferProcessTXN(), ReorderBufferQueueChange(), ReorderBufferResetTXN(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferToastReset(), and ReorderBufferTruncateTXN().

◆ ReorderBufferReturnRelids()

void ReorderBufferReturnRelids ( ReorderBuffer rb,
Oid relids 
)

Definition at line 606 of file reorderbuffer.c.

607 {
608  pfree(relids);
609 }

References pfree().

Referenced by ReorderBufferReturnChange().

◆ ReorderBufferReturnTupleBuf()

void ReorderBufferReturnTupleBuf ( ReorderBuffer rb,
ReorderBufferTupleBuf tuple 
)

Definition at line 575 of file reorderbuffer.c.

576 {
577  pfree(tuple);
578 }

References pfree().

Referenced by ReorderBufferReturnChange().

◆ ReorderBufferSetBaseSnapshot()

void ReorderBufferSetBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 3122 of file reorderbuffer.c.

3124 {
3125  ReorderBufferTXN *txn;
3126  bool is_new;
3127 
3128  Assert(snap != NULL);
3129 
3130  /*
3131  * Fetch the transaction to operate on. If we know it's a subtransaction,
3132  * operate on its top-level transaction instead.
3133  */
3134  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
3135  if (rbtxn_is_known_subxact(txn))
3136  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3137  NULL, InvalidXLogRecPtr, false);
3138  Assert(txn->base_snapshot == NULL);
3139 
3140  txn->base_snapshot = snap;
3141  txn->base_snapshot_lsn = lsn;
3143 
3144  AssertTXNLsnOrder(rb);
3145 }
XLogRecPtr base_snapshot_lsn
dlist_node base_snapshot_node

References Assert(), AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBufferTXN::base_snapshot_node, dlist_push_tail(), InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), ReorderBufferTXN::toplevel_xid, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by SnapBuildCommitTxn(), and SnapBuildProcessChange().

◆ ReorderBufferSetRestartPoint()

void ReorderBufferSetRestartPoint ( ReorderBuffer rb,
XLogRecPtr  ptr 
)

Definition at line 1052 of file reorderbuffer.c.

1053 {
1054  rb->current_restart_decoding_lsn = ptr;
1055 }

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

◆ ReorderBufferSkipPrepare()

void ReorderBufferSkipPrepare ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 2750 of file reorderbuffer.c.

2751 {
2752  ReorderBufferTXN *txn;
2753 
2754  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2755 
2756  /* unknown transaction, nothing to do */
2757  if (txn == NULL)
2758  return;
2759 
2761 }

References InvalidXLogRecPtr, RBTXN_SKIPPED_PREPARE, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by DecodePrepare().

◆ ReorderBufferXidHasBaseSnapshot()

bool ReorderBufferXidHasBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 3430 of file reorderbuffer.c.

3431 {
3432  ReorderBufferTXN *txn;
3433 
3434  txn = ReorderBufferTXNByXid(rb, xid, false,
3435  NULL, InvalidXLogRecPtr, false);
3436 
3437  /* transaction isn't known yet, ergo no snapshot */
3438  if (txn == NULL)
3439  return false;
3440 
3441  /* a known subtxn? operate on top-level txn instead */
3442  if (rbtxn_is_known_subxact(txn))
3443  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3444  NULL, InvalidXLogRecPtr, false);
3445 
3446  return txn->base_snapshot != NULL;
3447 }

References ReorderBufferTXN::base_snapshot, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), and ReorderBufferTXN::toplevel_xid.

Referenced by SnapBuildCommitTxn(), SnapBuildDistributeNewCatalogSnapshot(), and SnapBuildProcessChange().

◆ ReorderBufferXidHasCatalogChanges()

bool ReorderBufferXidHasCatalogChanges ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 3413 of file reorderbuffer.c.

3414 {
3415  ReorderBufferTXN *txn;
3416 
3417  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3418  false);
3419  if (txn == NULL)
3420  return false;
3421 
3422  return rbtxn_has_catalog_changes(txn);
3423 }

References InvalidXLogRecPtr, rbtxn_has_catalog_changes, and ReorderBufferTXNByXid().

Referenced by SnapBuildXidHasCatalogChanges().

◆ ReorderBufferXidSetCatalogChanges()

void ReorderBufferXidSetCatalogChanges ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3340 of file reorderbuffer.c.

3342 {
3343  ReorderBufferTXN *txn;
3344 
3345  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3346 
3347  if (!rbtxn_has_catalog_changes(txn))
3348  {
3351  }
3352 
3353  /*
3354  * Mark top-level transaction as having catalog changes too if one of its
3355  * children has so that the ReorderBufferBuildTupleCidHash can
3356  * conveniently check just top-level transaction and decide whether to
3357  * build the hash table or not.
3358  */
3359  if (rbtxn_is_subtxn(txn))
3360  {
3361  ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
3362 
3363  if (!rbtxn_has_catalog_changes(toptxn))
3364  {
3367  }
3368  }
3369 }
static void dclist_push_tail(dclist_head *head, dlist_node *node)
Definition: ilist.h:709
#define rbtxn_is_subtxn(txn)
dlist_node catchange_node

References ReorderBufferTXN::catchange_node, ReorderBuffer::catchange_txns, dclist_push_tail(), rbtxn_get_toptxn, RBTXN_HAS_CATALOG_CHANGES, rbtxn_has_catalog_changes, rbtxn_is_subtxn, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by heap_decode(), SnapBuildProcessNewCid(), and xact_decode().

◆ StartupReorderBuffer()

void StartupReorderBuffer ( void  )

Definition at line 4591 of file reorderbuffer.c.

4592 {
4593  DIR *logical_dir;
4594  struct dirent *logical_de;
4595 
4596  logical_dir = AllocateDir("pg_replslot");
4597  while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
4598  {
4599  if (strcmp(logical_de->d_name, ".") == 0 ||
4600  strcmp(logical_de->d_name, "..") == 0)
4601  continue;
4602 
4603  /* if it cannot be a slot, skip the directory */
4604  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
4605  continue;
4606 
4607  /*
4608  * ok, has to be a surviving logical slot, iterate and delete
4609  * everything starting with xid-*
4610  */
4612  }
4613  FreeDir(logical_dir);
4614 }
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2854
int FreeDir(DIR *dir)
Definition: fd.c:2906
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2788
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:199
Definition: dirent.c:26
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15

References AllocateDir(), dirent::d_name, DEBUG2, FreeDir(), ReadDir(), ReorderBufferCleanupSerializedTXNs(), and ReplicationSlotValidateName().

Referenced by StartupXLOG().

Variable Documentation

◆ debug_logical_replication_streaming

PGDLLIMPORT int debug_logical_replication_streaming
extern

Definition at line 213 of file reorderbuffer.c.

Referenced by pa_send_data(), and ReorderBufferCheckMemoryLimit().

◆ logical_decoding_work_mem

PGDLLIMPORT int logical_decoding_work_mem
extern

Definition at line 209 of file reorderbuffer.c.

Referenced by ReorderBufferCheckMemoryLimit().