PostgreSQL Source Code git master
reorderbuffer.h File Reference
#include "access/htup_details.h"
#include "lib/ilist.h"
#include "lib/pairingheap.h"
#include "storage/sinval.h"
#include "utils/hsearch.h"
#include "utils/relcache.h"
#include "utils/snapshot.h"
#include "utils/timestamp.h"
Include dependency graph for reorderbuffer.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ReorderBufferChange
 
struct  ReorderBufferTXN
 
struct  ReorderBuffer
 

Macros

#define PG_LOGICAL_DIR   "pg_logical"
 
#define PG_LOGICAL_MAPPINGS_DIR   PG_LOGICAL_DIR "/mappings"
 
#define PG_LOGICAL_SNAPSHOTS_DIR   PG_LOGICAL_DIR "/snapshots"
 
#define RBTXN_HAS_CATALOG_CHANGES   0x0001
 
#define RBTXN_IS_SUBXACT   0x0002
 
#define RBTXN_IS_SERIALIZED   0x0004
 
#define RBTXN_IS_SERIALIZED_CLEAR   0x0008
 
#define RBTXN_IS_STREAMED   0x0010
 
#define RBTXN_HAS_PARTIAL_CHANGE   0x0020
 
#define RBTXN_IS_PREPARED   0x0040
 
#define RBTXN_SKIPPED_PREPARE   0x0080
 
#define RBTXN_HAS_STREAMABLE_CHANGE   0x0100
 
#define RBTXN_SENT_PREPARE   0x0200
 
#define RBTXN_IS_COMMITTED   0x0400
 
#define RBTXN_IS_ABORTED   0x0800
 
#define RBTXN_PREPARE_STATUS_MASK   (RBTXN_IS_PREPARED | RBTXN_SKIPPED_PREPARE | RBTXN_SENT_PREPARE)
 
#define rbtxn_has_catalog_changes(txn)
 
#define rbtxn_is_known_subxact(txn)
 
#define rbtxn_is_serialized(txn)
 
#define rbtxn_is_serialized_clear(txn)
 
#define rbtxn_has_partial_change(txn)
 
#define rbtxn_has_streamable_change(txn)
 
#define rbtxn_is_streamed(txn)
 
#define rbtxn_is_prepared(txn)
 
#define rbtxn_sent_prepare(txn)
 
#define rbtxn_is_committed(txn)
 
#define rbtxn_is_aborted(txn)
 
#define rbtxn_skip_prepared(txn)
 
#define rbtxn_is_toptxn(txn)
 
#define rbtxn_is_subtxn(txn)
 
#define rbtxn_get_toptxn(txn)
 

Typedefs

typedef enum ReorderBufferChangeType ReorderBufferChangeType
 
typedef struct ReorderBufferChange ReorderBufferChange
 
typedef struct ReorderBufferTXN ReorderBufferTXN
 
typedef struct ReorderBuffer ReorderBuffer
 
typedef void(* ReorderBufferApplyChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
typedef void(* ReorderBufferApplyTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
typedef void(* ReorderBufferBeginCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
typedef void(* ReorderBufferCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
typedef void(* ReorderBufferMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
typedef void(* ReorderBufferBeginPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
typedef void(* ReorderBufferPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
typedef void(* ReorderBufferCommitPreparedCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
typedef void(* ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
 
typedef void(* ReorderBufferStreamStartCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr first_lsn)
 
typedef void(* ReorderBufferStreamStopCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr last_lsn)
 
typedef void(* ReorderBufferStreamAbortCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
 
typedef void(* ReorderBufferStreamPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
typedef void(* ReorderBufferStreamCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
typedef void(* ReorderBufferStreamChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
typedef void(* ReorderBufferStreamMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
typedef void(* ReorderBufferStreamTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
typedef void(* ReorderBufferUpdateProgressTxnCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr lsn)
 

Enumerations

enum  DebugLogicalRepStreamingMode { DEBUG_LOGICAL_REP_STREAMING_BUFFERED , DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE }
 
enum  ReorderBufferChangeType {
  REORDER_BUFFER_CHANGE_INSERT , REORDER_BUFFER_CHANGE_UPDATE , REORDER_BUFFER_CHANGE_DELETE , REORDER_BUFFER_CHANGE_MESSAGE ,
  REORDER_BUFFER_CHANGE_INVALIDATION , REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT , REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID , REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID ,
  REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT , REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM , REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT , REORDER_BUFFER_CHANGE_TRUNCATE
}
 

Functions

ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
HeapTuple ReorderBufferGetTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (HeapTuple tuple)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *rb)
 
void ReorderBufferReturnChange (ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
 
OidReorderBufferGetRelids (ReorderBuffer *rb, int nrelids)
 
void ReorderBufferReturnRelids (ReorderBuffer *rb, Oid *relids)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferFinishPrepared (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, TimestampTz abort_time)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferInvalidate (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileLocator locator, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferRememberPrepareInfo (ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferSkipPrepare (ReorderBuffer *rb, TransactionId xid)
 
void ReorderBufferPrepare (ReorderBuffer *rb, TransactionId xid, char *gid)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
TransactionId ReorderBufferGetOldestXmin (ReorderBuffer *rb)
 
TransactionIdReorderBufferGetCatalogChangesXacts (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
void StartupReorderBuffer (void)
 

Variables

PGDLLIMPORT int logical_decoding_work_mem
 
PGDLLIMPORT int debug_logical_replication_streaming
 

Macro Definition Documentation

◆ PG_LOGICAL_DIR

#define PG_LOGICAL_DIR   "pg_logical"

Definition at line 22 of file reorderbuffer.h.

◆ PG_LOGICAL_MAPPINGS_DIR

#define PG_LOGICAL_MAPPINGS_DIR   PG_LOGICAL_DIR "/mappings"

Definition at line 23 of file reorderbuffer.h.

◆ PG_LOGICAL_SNAPSHOTS_DIR

#define PG_LOGICAL_SNAPSHOTS_DIR   PG_LOGICAL_DIR "/snapshots"

Definition at line 24 of file reorderbuffer.h.

◆ rbtxn_get_toptxn

#define rbtxn_get_toptxn (   txn)
Value:
( \
rbtxn_is_subtxn(txn) ? (txn)->toptxn : (txn) \
)

Definition at line 281 of file reorderbuffer.h.

◆ RBTXN_HAS_CATALOG_CHANGES

#define RBTXN_HAS_CATALOG_CHANGES   0x0001

Definition at line 167 of file reorderbuffer.h.

◆ rbtxn_has_catalog_changes

#define rbtxn_has_catalog_changes (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_HAS_CATALOG_CHANGES) != 0 \
)
#define RBTXN_HAS_CATALOG_CHANGES

Definition at line 183 of file reorderbuffer.h.

◆ RBTXN_HAS_PARTIAL_CHANGE

#define RBTXN_HAS_PARTIAL_CHANGE   0x0020

Definition at line 172 of file reorderbuffer.h.

◆ rbtxn_has_partial_change

#define rbtxn_has_partial_change (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_HAS_PARTIAL_CHANGE) != 0 \
)
#define RBTXN_HAS_PARTIAL_CHANGE

Definition at line 207 of file reorderbuffer.h.

◆ RBTXN_HAS_STREAMABLE_CHANGE

#define RBTXN_HAS_STREAMABLE_CHANGE   0x0100

Definition at line 175 of file reorderbuffer.h.

◆ rbtxn_has_streamable_change

#define rbtxn_has_streamable_change (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_HAS_STREAMABLE_CHANGE) != 0 \
)
#define RBTXN_HAS_STREAMABLE_CHANGE

Definition at line 213 of file reorderbuffer.h.

◆ RBTXN_IS_ABORTED

#define RBTXN_IS_ABORTED   0x0800

Definition at line 178 of file reorderbuffer.h.

◆ rbtxn_is_aborted

#define rbtxn_is_aborted (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_IS_ABORTED) != 0 \
)
#define RBTXN_IS_ABORTED

Definition at line 257 of file reorderbuffer.h.

◆ RBTXN_IS_COMMITTED

#define RBTXN_IS_COMMITTED   0x0400

Definition at line 177 of file reorderbuffer.h.

◆ rbtxn_is_committed

#define rbtxn_is_committed (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_IS_COMMITTED) != 0 \
)
#define RBTXN_IS_COMMITTED

Definition at line 251 of file reorderbuffer.h.

◆ rbtxn_is_known_subxact

#define rbtxn_is_known_subxact (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_IS_SUBXACT) != 0 \
)
#define RBTXN_IS_SUBXACT

Definition at line 189 of file reorderbuffer.h.

◆ RBTXN_IS_PREPARED

#define RBTXN_IS_PREPARED   0x0040

Definition at line 173 of file reorderbuffer.h.

◆ rbtxn_is_prepared

#define rbtxn_is_prepared (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_IS_PREPARED) != 0 \
)
#define RBTXN_IS_PREPARED

Definition at line 239 of file reorderbuffer.h.

◆ RBTXN_IS_SERIALIZED

#define RBTXN_IS_SERIALIZED   0x0004

Definition at line 169 of file reorderbuffer.h.

◆ rbtxn_is_serialized

#define rbtxn_is_serialized (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \
)
#define RBTXN_IS_SERIALIZED

Definition at line 195 of file reorderbuffer.h.

◆ RBTXN_IS_SERIALIZED_CLEAR

#define RBTXN_IS_SERIALIZED_CLEAR   0x0008

Definition at line 170 of file reorderbuffer.h.

◆ rbtxn_is_serialized_clear

#define rbtxn_is_serialized_clear (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_IS_SERIALIZED_CLEAR) != 0 \
)
#define RBTXN_IS_SERIALIZED_CLEAR

Definition at line 201 of file reorderbuffer.h.

◆ RBTXN_IS_STREAMED

#define RBTXN_IS_STREAMED   0x0010

Definition at line 171 of file reorderbuffer.h.

◆ rbtxn_is_streamed

#define rbtxn_is_streamed (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \
)
#define RBTXN_IS_STREAMED

Definition at line 227 of file reorderbuffer.h.

◆ rbtxn_is_subtxn

#define rbtxn_is_subtxn (   txn)
Value:
( \
(txn)->toptxn != NULL \
)

Definition at line 275 of file reorderbuffer.h.

◆ RBTXN_IS_SUBXACT

#define RBTXN_IS_SUBXACT   0x0002

Definition at line 168 of file reorderbuffer.h.

◆ rbtxn_is_toptxn

#define rbtxn_is_toptxn (   txn)
Value:
( \
(txn)->toptxn == NULL \
)

Definition at line 269 of file reorderbuffer.h.

◆ RBTXN_PREPARE_STATUS_MASK

#define RBTXN_PREPARE_STATUS_MASK   (RBTXN_IS_PREPARED | RBTXN_SKIPPED_PREPARE | RBTXN_SENT_PREPARE)

Definition at line 180 of file reorderbuffer.h.

◆ RBTXN_SENT_PREPARE

#define RBTXN_SENT_PREPARE   0x0200

Definition at line 176 of file reorderbuffer.h.

◆ rbtxn_sent_prepare

#define rbtxn_sent_prepare (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_SENT_PREPARE) != 0 \
)
#define RBTXN_SENT_PREPARE

Definition at line 245 of file reorderbuffer.h.

◆ rbtxn_skip_prepared

#define rbtxn_skip_prepared (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_SKIPPED_PREPARE) != 0 \
)
#define RBTXN_SKIPPED_PREPARE

Definition at line 263 of file reorderbuffer.h.

◆ RBTXN_SKIPPED_PREPARE

#define RBTXN_SKIPPED_PREPARE   0x0080

Definition at line 174 of file reorderbuffer.h.

Typedef Documentation

◆ ReorderBuffer

typedef struct ReorderBuffer ReorderBuffer

Definition at line 458 of file reorderbuffer.h.

◆ ReorderBufferApplyChangeCB

typedef void(* ReorderBufferApplyChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)

Definition at line 461 of file reorderbuffer.h.

◆ ReorderBufferApplyTruncateCB

typedef void(* ReorderBufferApplyTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)

Definition at line 467 of file reorderbuffer.h.

◆ ReorderBufferBeginCB

typedef void(* ReorderBufferBeginCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)

Definition at line 474 of file reorderbuffer.h.

◆ ReorderBufferBeginPrepareCB

typedef void(* ReorderBufferBeginPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)

Definition at line 491 of file reorderbuffer.h.

◆ ReorderBufferChange

◆ ReorderBufferChangeType

◆ ReorderBufferCommitCB

typedef void(* ReorderBufferCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

Definition at line 478 of file reorderbuffer.h.

◆ ReorderBufferCommitPreparedCB

typedef void(* ReorderBufferCommitPreparedCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

Definition at line 500 of file reorderbuffer.h.

◆ ReorderBufferMessageCB

typedef void(* ReorderBufferMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)

Definition at line 483 of file reorderbuffer.h.

◆ ReorderBufferPrepareCB

typedef void(* ReorderBufferPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)

Definition at line 495 of file reorderbuffer.h.

◆ ReorderBufferRollbackPreparedCB

typedef void(* ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)

Definition at line 505 of file reorderbuffer.h.

◆ ReorderBufferStreamAbortCB

typedef void(* ReorderBufferStreamAbortCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)

Definition at line 521 of file reorderbuffer.h.

◆ ReorderBufferStreamChangeCB

typedef void(* ReorderBufferStreamChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)

Definition at line 536 of file reorderbuffer.h.

◆ ReorderBufferStreamCommitCB

typedef void(* ReorderBufferStreamCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

Definition at line 531 of file reorderbuffer.h.

◆ ReorderBufferStreamMessageCB

typedef void(* ReorderBufferStreamMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)

Definition at line 542 of file reorderbuffer.h.

◆ ReorderBufferStreamPrepareCB

typedef void(* ReorderBufferStreamPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)

Definition at line 526 of file reorderbuffer.h.

◆ ReorderBufferStreamStartCB

typedef void(* ReorderBufferStreamStartCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr first_lsn)

Definition at line 511 of file reorderbuffer.h.

◆ ReorderBufferStreamStopCB

typedef void(* ReorderBufferStreamStopCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr last_lsn)

Definition at line 516 of file reorderbuffer.h.

◆ ReorderBufferStreamTruncateCB

typedef void(* ReorderBufferStreamTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)

Definition at line 550 of file reorderbuffer.h.

◆ ReorderBufferTXN

◆ ReorderBufferUpdateProgressTxnCB

typedef void(* ReorderBufferUpdateProgressTxnCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr lsn)

Definition at line 557 of file reorderbuffer.h.

Enumeration Type Documentation

◆ DebugLogicalRepStreamingMode

Enumerator
DEBUG_LOGICAL_REP_STREAMING_BUFFERED 
DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE 

Definition at line 31 of file reorderbuffer.h.

◆ ReorderBufferChangeType

Enumerator
REORDER_BUFFER_CHANGE_INSERT 
REORDER_BUFFER_CHANGE_UPDATE 
REORDER_BUFFER_CHANGE_DELETE 
REORDER_BUFFER_CHANGE_MESSAGE 
REORDER_BUFFER_CHANGE_INVALIDATION 
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT 
REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID 
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT 
REORDER_BUFFER_CHANGE_TRUNCATE 

Definition at line 50 of file reorderbuffer.h.

51{
ReorderBufferChangeType
Definition: reorderbuffer.h:51
@ REORDER_BUFFER_CHANGE_INVALIDATION
Definition: reorderbuffer.h:56
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
Definition: reorderbuffer.h:61
@ REORDER_BUFFER_CHANGE_INSERT
Definition: reorderbuffer.h:52
@ REORDER_BUFFER_CHANGE_MESSAGE
Definition: reorderbuffer.h:55
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT
Definition: reorderbuffer.h:62
@ REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID
Definition: reorderbuffer.h:58
@ REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
Definition: reorderbuffer.h:59
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT
Definition: reorderbuffer.h:60
@ REORDER_BUFFER_CHANGE_TRUNCATE
Definition: reorderbuffer.h:63
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:54
@ REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT
Definition: reorderbuffer.h:57
@ REORDER_BUFFER_CHANGE_UPDATE
Definition: reorderbuffer.h:53

Function Documentation

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
TimestampTz  abort_time 
)

Definition at line 3040 of file reorderbuffer.c.

3042{
3043 ReorderBufferTXN *txn;
3044
3045 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3046 false);
3047
3048 /* unknown, nothing to remove */
3049 if (txn == NULL)
3050 return;
3051
3052 txn->xact_time.abort_time = abort_time;
3053
3054 /* For streamed transactions notify the remote node about the abort. */
3055 if (rbtxn_is_streamed(txn))
3056 {
3057 rb->stream_abort(rb, txn, lsn);
3058
3059 /*
3060 * We might have decoded changes for this transaction that could load
3061 * the cache as per the current transaction's view (consider DDL's
3062 * happened in this transaction). We don't want the decoding of future
3063 * transactions to use those cache entries so execute invalidations.
3064 */
3065 if (txn->ninvalidations > 0)
3067 txn->invalidations);
3068 }
3069
3070 /* cosmetic... */
3071 txn->final_lsn = lsn;
3072
3073 /* remove potential on-disk data, and deallocate */
3074 ReorderBufferCleanupTXN(rb, txn);
3075}
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_streamed(txn)
SharedInvalidationMessage * invalidations
TimestampTz abort_time
XLogRecPtr final_lsn
union ReorderBufferTXN::@116 xact_time
ReorderBufferStreamAbortCB stream_abort
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References ReorderBufferTXN::abort_time, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), ReorderBuffer::stream_abort, and ReorderBufferTXN::xact_time.

Referenced by DecodeAbort().

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)

Definition at line 3085 of file reorderbuffer.c.

3086{
3088
3089 /*
3090 * Iterate through all (potential) toplevel TXNs and abort all that are
3091 * older than what possibly can be running. Once we've found the first
3092 * that is alive we stop, there might be some that acquired an xid earlier
3093 * but started writing later, but it's unlikely and they will be cleaned
3094 * up in a later call to this function.
3095 */
3097 {
3098 ReorderBufferTXN *txn;
3099
3100 txn = dlist_container(ReorderBufferTXN, node, it.cur);
3101
3102 if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
3103 {
3104 elog(DEBUG2, "aborting old transaction %u", txn->xid);
3105
3106 /* Notify the remote node about the crash/immediate restart. */
3107 if (rbtxn_is_streamed(txn))
3108 rb->stream_abort(rb, txn, InvalidXLogRecPtr);
3109
3110 /* remove potential on-disk data, and deallocate this tx */
3111 ReorderBufferCleanupTXN(rb, txn);
3112 }
3113 else
3114 return;
3115 }
3116}
#define DEBUG2
Definition: elog.h:29
#define elog(elevel,...)
Definition: elog.h:225
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:640
#define dlist_container(type, membername, ptr)
Definition: ilist.h:593
TransactionId xid
dlist_head toplevel_by_lsn
dlist_node * cur
Definition: ilist.h:200
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280

References dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, elog, InvalidXLogRecPtr, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBuffer::stream_abort, ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), and ReorderBufferTXN::xid.

Referenced by standby_decode().

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 3438 of file reorderbuffer.c.

3441{
3442 ReorderBufferTXN *txn;
3443 MemoryContext oldcontext;
3444 ReorderBufferChange *change;
3445
3446 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3447
3448 oldcontext = MemoryContextSwitchTo(rb->context);
3449
3450 /*
3451 * Collect all the invalidations under the top transaction, if available,
3452 * so that we can execute them all together. See comments atop this
3453 * function.
3454 */
3455 txn = rbtxn_get_toptxn(txn);
3456
3457 Assert(nmsgs > 0);
3458
3459 /* Accumulate invalidations. */
3460 if (txn->ninvalidations == 0)
3461 {
3462 txn->ninvalidations = nmsgs;
3464 palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3465 memcpy(txn->invalidations, msgs,
3466 sizeof(SharedInvalidationMessage) * nmsgs);
3467 }
3468 else
3469 {
3472 (txn->ninvalidations + nmsgs));
3473
3474 memcpy(txn->invalidations + txn->ninvalidations, msgs,
3475 nmsgs * sizeof(SharedInvalidationMessage));
3476 txn->ninvalidations += nmsgs;
3477 }
3478
3479 change = ReorderBufferGetChange(rb);
3481 change->data.inval.ninvalidations = nmsgs;
3483 palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3484 memcpy(change->data.inval.invalidations, msgs,
3485 sizeof(SharedInvalidationMessage) * nmsgs);
3486
3487 ReorderBufferQueueChange(rb, xid, lsn, change, false);
3488
3489 MemoryContextSwitchTo(oldcontext);
3490}
#define Assert(condition)
Definition: c.h:815
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1541
void * palloc(Size size)
Definition: mcxt.c:1317
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
#define rbtxn_get_toptxn(txn)
ReorderBufferChangeType action
Definition: reorderbuffer.h:81
union ReorderBufferChange::@110 data
struct ReorderBufferChange::@110::@115 inval
SharedInvalidationMessage * invalidations
MemoryContext context

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::invalidations, ReorderBufferTXN::invalidations, MemoryContextSwitchTo(), ReorderBufferChange::ninvalidations, ReorderBufferTXN::ninvalidations, palloc(), rbtxn_get_toptxn, REORDER_BUFFER_CHANGE_INVALIDATION, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), and repalloc().

Referenced by xact_decode().

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileLocator  locator,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 3401 of file reorderbuffer.c.

3405{
3407 ReorderBufferTXN *txn;
3408
3409 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3410
3411 change->data.tuplecid.locator = locator;
3412 change->data.tuplecid.tid = tid;
3413 change->data.tuplecid.cmin = cmin;
3414 change->data.tuplecid.cmax = cmax;
3415 change->data.tuplecid.combocid = combocid;
3416 change->lsn = lsn;
3417 change->txn = txn;
3419
3420 dlist_push_tail(&txn->tuplecids, &change->node);
3421 txn->ntuplecids++;
3422}
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:364
ItemPointerData tid
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:84
RelFileLocator locator
struct ReorderBufferChange::@110::@114 tuplecid
dlist_head tuplecids

References ReorderBufferChange::action, ReorderBufferChange::cmax, ReorderBufferChange::cmin, ReorderBufferChange::combocid, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::locator, ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tid, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, and ReorderBufferChange::txn.

Referenced by SnapBuildProcessNewCid().

◆ ReorderBufferAddSnapshot()

◆ ReorderBufferAllocate()

ReorderBuffer * ReorderBufferAllocate ( void  )

Definition at line 312 of file reorderbuffer.c.

313{
314 ReorderBuffer *buffer;
315 HASHCTL hash_ctl;
316 MemoryContext new_ctx;
317
318 Assert(MyReplicationSlot != NULL);
319
320 /* allocate memory in own context, to have better accountability */
322 "ReorderBuffer",
324
325 buffer =
326 (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
327
328 memset(&hash_ctl, 0, sizeof(hash_ctl));
329
330 buffer->context = new_ctx;
331
332 buffer->change_context = SlabContextCreate(new_ctx,
333 "Change",
335 sizeof(ReorderBufferChange));
336
337 buffer->txn_context = SlabContextCreate(new_ctx,
338 "TXN",
340 sizeof(ReorderBufferTXN));
341
342 /*
343 * To minimize memory fragmentation caused by long-running transactions
344 * with changes spanning multiple memory blocks, we use a single
345 * fixed-size memory block for decoded tuple storage. The performance
346 * testing showed that the default memory block size maintains logical
347 * decoding performance without causing fragmentation due to concurrent
348 * transactions. One might think that we can use the max size as
349 * SLAB_LARGE_BLOCK_SIZE but the test also showed it doesn't help resolve
350 * the memory fragmentation.
351 */
352 buffer->tup_context = GenerationContextCreate(new_ctx,
353 "Tuples",
357
358 hash_ctl.keysize = sizeof(TransactionId);
359 hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
360 hash_ctl.hcxt = buffer->context;
361
362 buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
364
366 buffer->by_txn_last_txn = NULL;
367
368 buffer->outbuf = NULL;
369 buffer->outbufsize = 0;
370 buffer->size = 0;
371
372 /* txn_heap is ordered by transaction size */
374
375 buffer->spillTxns = 0;
376 buffer->spillCount = 0;
377 buffer->spillBytes = 0;
378 buffer->streamTxns = 0;
379 buffer->streamCount = 0;
380 buffer->streamBytes = 0;
381 buffer->totalTxns = 0;
382 buffer->totalBytes = 0;
383
385
386 dlist_init(&buffer->toplevel_by_lsn);
388 dclist_init(&buffer->catchange_txns);
389
390 /*
391 * Ensure there's no stale data from prior uses of this slot, in case some
392 * prior exit avoided calling ReorderBufferFree. Failure to do this can
393 * produce duplicated txns, and it's very cheap if there's nothing there.
394 */
396
397 return buffer;
398}
#define NameStr(name)
Definition: c.h:703
uint32 TransactionId
Definition: c.h:609
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: generation.c:160
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
static void dclist_init(dclist_head *head)
Definition: ilist.h:671
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1181
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:189
pairingheap * pairingheap_allocate(pairingheap_comparator compare, void *arg)
Definition: pairingheap.c:42
static int ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg)
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:322
ReplicationSlot * MyReplicationSlot
Definition: slot.c:138
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
MemoryContext hcxt
Definition: hsearch.h:86
dlist_head txns_by_base_snapshot_lsn
dclist_head catchange_txns
MemoryContext change_context
ReorderBufferTXN * by_txn_last_txn
TransactionId by_txn_last_xid
MemoryContext tup_context
pairingheap * txn_heap
MemoryContext txn_context
XLogRecPtr current_restart_decoding_lsn
ReplicationSlotPersistentData data
Definition: slot.h:181
#define InvalidTransactionId
Definition: transam.h:31

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::catchange_txns, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dclist_init(), dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, pairingheap_allocate(), ReorderBufferCleanupSerializedTXNs(), ReorderBufferTXNSizeCompare(), ReorderBuffer::size, SLAB_DEFAULT_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBuffer::toplevel_by_lsn, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBuffer::tup_context, ReorderBuffer::txn_context, ReorderBuffer::txn_heap, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 1080 of file reorderbuffer.c.

1082{
1083 ReorderBufferTXN *txn;
1084 ReorderBufferTXN *subtxn;
1085 bool new_top;
1086 bool new_sub;
1087
1088 txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1089 subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1090
1091 if (!new_sub)
1092 {
1093 if (rbtxn_is_known_subxact(subtxn))
1094 {
1095 /* already associated, nothing to do */
1096 return;
1097 }
1098 else
1099 {
1100 /*
1101 * We already saw this transaction, but initially added it to the
1102 * list of top-level txns. Now that we know it's not top-level,
1103 * remove it from there.
1104 */
1105 dlist_delete(&subtxn->node);
1106 }
1107 }
1108
1109 subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1110 subtxn->toplevel_xid = xid;
1111 Assert(subtxn->nsubtxns == 0);
1112
1113 /* set the reference to top-level transaction */
1114 subtxn->toptxn = txn;
1115
1116 /* add to subtransaction list */
1117 dlist_push_tail(&txn->subtxns, &subtxn->node);
1118 txn->nsubtxns++;
1119
1120 /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1122
1123 /* Verify LSN-ordering invariant */
1125}
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
#define rbtxn_is_known_subxact(txn)
TransactionId toplevel_xid
struct ReorderBufferTXN * toptxn
dlist_head subtxns

References Assert, AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SUBXACT, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, ReorderBufferTXN::toplevel_xid, ReorderBufferTXN::toptxn, and ReorderBufferTXN::txn_flags.

Referenced by LogicalDecodingProcessRecord(), and ReorderBufferCommitChild().

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2837 of file reorderbuffer.c.

2841{
2842 ReorderBufferTXN *txn;
2843
2844 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2845 false);
2846
2847 /* unknown transaction, nothing to replay */
2848 if (txn == NULL)
2849 return;
2850
2851 ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2852 origin_id, origin_lsn);
2853}
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)

References InvalidXLogRecPtr, ReorderBufferReplay(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 1200 of file reorderbuffer.c.

1203{
1204 ReorderBufferTXN *subtxn;
1205
1206 subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1207 InvalidXLogRecPtr, false);
1208
1209 /*
1210 * No need to do anything if that subtxn didn't contain any changes
1211 */
1212 if (!subtxn)
1213 return;
1214
1215 subtxn->final_lsn = commit_lsn;
1216 subtxn->end_lsn = end_lsn;
1217
1218 /*
1219 * Assign this subxact as a child of the toplevel xact (no-op if already
1220 * done.)
1221 */
1223}
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
XLogRecPtr end_lsn

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit(), and DecodePrepare().

◆ ReorderBufferFinishPrepared()

void ReorderBufferFinishPrepared ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
XLogRecPtr  two_phase_at,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn,
char *  gid,
bool  is_commit 
)

Definition at line 2954 of file reorderbuffer.c.

2959{
2960 ReorderBufferTXN *txn;
2961 XLogRecPtr prepare_end_lsn;
2962 TimestampTz prepare_time;
2963
2964 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
2965
2966 /* unknown transaction, nothing to do */
2967 if (txn == NULL)
2968 return;
2969
2970 /*
2971 * By this time the txn has the prepare record information, remember it to
2972 * be later used for rollback.
2973 */
2974 prepare_end_lsn = txn->end_lsn;
2975 prepare_time = txn->xact_time.prepare_time;
2976
2977 /* add the gid in the txn */
2978 txn->gid = pstrdup(gid);
2979
2980 /*
2981 * It is possible that this transaction is not decoded at prepare time
2982 * either because by that time we didn't have a consistent snapshot, or
2983 * two_phase was not enabled, or it was decoded earlier but we have
2984 * restarted. We only need to send the prepare if it was not decoded
2985 * earlier. We don't need to decode the xact for aborts if it is not done
2986 * already.
2987 */
2988 if ((txn->final_lsn < two_phase_at) && is_commit)
2989 {
2990 /*
2991 * txn must have been marked as a prepared transaction and skipped but
2992 * not sent a prepare. Also, the prepare info must have been updated
2993 * in txn even if we skip prepare.
2994 */
2998
2999 /*
3000 * By this time the txn has the prepare record information and it is
3001 * important to use that so that downstream gets the accurate
3002 * information. If instead, we have passed commit information here
3003 * then downstream can behave as it has already replayed commit
3004 * prepared after the restart.
3005 */
3006 ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
3007 txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
3008 }
3009
3010 txn->final_lsn = commit_lsn;
3011 txn->end_lsn = end_lsn;
3012 txn->xact_time.commit_time = commit_time;
3013 txn->origin_id = origin_id;
3014 txn->origin_lsn = origin_lsn;
3015
3016 if (is_commit)
3017 rb->commit_prepared(rb, txn, commit_lsn);
3018 else
3019 rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
3020
3021 /* cleanup: make sure there's no cache pollution */
3023 txn->invalidations);
3024 ReorderBufferCleanupTXN(rb, txn);
3025}
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1696
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
#define RBTXN_PREPARE_STATUS_MASK
TimestampTz commit_time
RepOriginId origin_id
XLogRecPtr origin_lsn
TimestampTz prepare_time
ReorderBufferCommitPreparedCB commit_prepared
ReorderBufferRollbackPreparedCB rollback_prepared
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References Assert, ReorderBuffer::commit_prepared, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, pstrdup(), RBTXN_IS_PREPARED, RBTXN_PREPARE_STATUS_MASK, RBTXN_SKIPPED_PREPARE, ReorderBufferCleanupTXN(), ReorderBufferExecuteInvalidations(), ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBuffer::rollback_prepared, ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodeAbort(), and DecodeCommit().

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3132 of file reorderbuffer.c.

3133{
3134 ReorderBufferTXN *txn;
3135
3136 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3137 false);
3138
3139 /* unknown, nothing to forget */
3140 if (txn == NULL)
3141 return;
3142
3143 /* this transaction mustn't be streamed */
3145
3146 /* cosmetic... */
3147 txn->final_lsn = lsn;
3148
3149 /*
3150 * Process cache invalidation messages if there are any. Even if we're not
3151 * interested in the transaction's contents, it could have manipulated the
3152 * catalog and we need to update the caches according to that.
3153 */
3154 if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3156 txn->invalidations);
3157 else
3158 Assert(txn->ninvalidations == 0);
3159
3160 /* remove potential on-disk data, and deallocate */
3161 ReorderBufferCleanupTXN(rb, txn);
3162}
Snapshot base_snapshot

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer rb)

Definition at line 404 of file reorderbuffer.c.

405{
406 MemoryContext context = rb->context;
407
408 /*
409 * We free separately allocated data by entirely scrapping reorderbuffer's
410 * memory context.
411 */
412 MemoryContextDelete(context);
413
414 /* Free disk space used by unconsumed reorder buffers */
416}
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454

References ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

◆ ReorderBufferGetCatalogChangesXacts()

TransactionId * ReorderBufferGetCatalogChangesXacts ( ReorderBuffer rb)

Definition at line 3547 of file reorderbuffer.c.

3548{
3549 dlist_iter iter;
3550 TransactionId *xids = NULL;
3551 size_t xcnt = 0;
3552
3553 /* Quick return if the list is empty */
3554 if (dclist_count(&rb->catchange_txns) == 0)
3555 return NULL;
3556
3557 /* Initialize XID array */
3558 xids = (TransactionId *) palloc(sizeof(TransactionId) *
3560 dclist_foreach(iter, &rb->catchange_txns)
3561 {
3563 catchange_node,
3564 iter.cur);
3565
3567
3568 xids[xcnt++] = txn->xid;
3569 }
3570
3571 qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
3572
3573 Assert(xcnt == dclist_count(&rb->catchange_txns));
3574 return xids;
3575}
#define dclist_container(type, membername, ptr)
Definition: ilist.h:947
static uint32 dclist_count(const dclist_head *head)
Definition: ilist.h:932
#define dclist_foreach(iter, lhead)
Definition: ilist.h:970
#define qsort(a, b, c, d)
Definition: port.h:475
#define rbtxn_has_catalog_changes(txn)
dlist_node * cur
Definition: ilist.h:179
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:152

References Assert, ReorderBuffer::catchange_txns, dlist_iter::cur, dclist_container, dclist_count(), dclist_foreach, palloc(), qsort, rbtxn_has_catalog_changes, ReorderBufferTXN::xid, and xidComparator().

Referenced by SnapBuildSerialize().

◆ ReorderBufferGetChange()

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN * ReorderBufferGetOldestTXN ( ReorderBuffer rb)

Definition at line 1025 of file reorderbuffer.c.

1026{
1027 ReorderBufferTXN *txn;
1028
1030
1032 return NULL;
1033
1035
1038 return txn;
1039}
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:603
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
XLogRecPtr first_lsn

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, and ReorderBuffer::toplevel_by_lsn.

Referenced by SnapBuildProcessRunningXacts().

◆ ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBuffer rb)

◆ ReorderBufferGetRelids()

Oid * ReorderBufferGetRelids ( ReorderBuffer rb,
int  nrelids 
)

Definition at line 606 of file reorderbuffer.c.

607{
608 Oid *relids;
609 Size alloc_len;
610
611 alloc_len = sizeof(Oid) * nrelids;
612
613 relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
614
615 return relids;
616}
size_t Size
Definition: c.h:562
unsigned int Oid
Definition: postgres_ext.h:32

References ReorderBuffer::context, and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

◆ ReorderBufferGetTupleBuf()

HeapTuple ReorderBufferGetTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)

Definition at line 573 of file reorderbuffer.c.

574{
575 HeapTuple tuple;
576 Size alloc_len;
577
578 alloc_len = tuple_len + SizeofHeapTupleHeader;
579
581 HEAPTUPLESIZE + alloc_len);
582 tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
583
584 return tuple;
585}
#define HEAPTUPLESIZE
Definition: htup.h:73
HeapTupleData * HeapTuple
Definition: htup.h:71
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
#define SizeofHeapTupleHeader
Definition: htup_details.h:185
HeapTupleHeader t_data
Definition: htup.h:68

References HEAPTUPLESIZE, MemoryContextAlloc(), SizeofHeapTupleHeader, HeapTupleData::t_data, and ReorderBuffer::tup_context.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 3204 of file reorderbuffer.c.

3206{
3207 bool use_subtxn = IsTransactionOrTransactionBlock();
3208 int i;
3209
3210 if (use_subtxn)
3212
3213 /*
3214 * Force invalidations to happen outside of a valid transaction - that way
3215 * entries will just be marked as invalid without accessing the catalog.
3216 * That's advantageous because we don't need to setup the full state
3217 * necessary for catalog access.
3218 */
3219 if (use_subtxn)
3221
3222 for (i = 0; i < ninvalidations; i++)
3223 LocalExecuteInvalidationMessage(&invalidations[i]);
3224
3225 if (use_subtxn)
3227}
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:763
int i
Definition: isn.c:72
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4981
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4686
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4788
void AbortCurrentTransaction(void)
Definition: xact.c:3443

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by ReorderBufferAbort(), ReorderBufferForget(), ReorderBufferInvalidate(), and xact_decode().

◆ ReorderBufferInvalidate()

void ReorderBufferInvalidate ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3173 of file reorderbuffer.c.

3174{
3175 ReorderBufferTXN *txn;
3176
3177 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3178 false);
3179
3180 /* unknown, nothing to do */
3181 if (txn == NULL)
3182 return;
3183
3184 /*
3185 * Process cache invalidation messages if there are any. Even if we're not
3186 * interested in the transaction's contents, it could have manipulated the
3187 * catalog and we need to update the caches according to that.
3188 */
3189 if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3191 txn->invalidations);
3192 else
3193 Assert(txn->ninvalidations == 0);
3194}

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodePrepare().

◆ ReorderBufferPrepare()

void ReorderBufferPrepare ( ReorderBuffer rb,
TransactionId  xid,
char *  gid 
)

Definition at line 2913 of file reorderbuffer.c.

2915{
2916 ReorderBufferTXN *txn;
2917
2918 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2919 false);
2920
2921 /* unknown transaction, nothing to replay */
2922 if (txn == NULL)
2923 return;
2924
2925 /*
2926 * txn must have been marked as a prepared transaction and must have
2927 * neither been skipped nor sent a prepare. Also, the prepare info must
2928 * have been updated in it by now.
2929 */
2932
2933 txn->gid = pstrdup(gid);
2934
2935 ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2936 txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2937
2938 /*
2939 * Send a prepare if not already done so. This might occur if we have
2940 * detected a concurrent abort while replaying the non-streaming
2941 * transaction.
2942 */
2943 if (!rbtxn_sent_prepare(txn))
2944 {
2945 rb->prepare(rb, txn, txn->final_lsn);
2947 }
2948}
#define rbtxn_sent_prepare(txn)
ReorderBufferPrepareCB prepare

References Assert, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBuffer::prepare, ReorderBufferTXN::prepare_time, pstrdup(), RBTXN_IS_PREPARED, RBTXN_PREPARE_STATUS_MASK, RBTXN_SENT_PREPARE, rbtxn_sent_prepare, ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodePrepare().

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3240 of file reorderbuffer.c.

3241{
3242 /* many records won't have an xid assigned, centralize check here */
3243 if (xid != InvalidTransactionId)
3244 ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3245}

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by heap2_decode(), heap_decode(), LogicalDecodingProcessRecord(), logicalmsg_decode(), standby_decode(), xact_decode(), and xlog_decode().

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change,
bool  toast_insert 
)

Definition at line 791 of file reorderbuffer.c.

793{
794 ReorderBufferTXN *txn;
795
796 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
797
798 /*
799 * If we have detected that the transaction is aborted while streaming the
800 * previous changes or by checking its CLOG, there is no point in
801 * collecting further changes for it.
802 */
803 if (rbtxn_is_aborted(txn))
804 {
805 /*
806 * We don't need to update memory accounting for this change as we
807 * have not added it to the queue yet.
808 */
809 ReorderBufferReturnChange(rb, change, false);
810 return;
811 }
812
813 /*
814 * The changes that are sent downstream are considered streamable. We
815 * remember such transactions so that only those will later be considered
816 * for streaming.
817 */
818 if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
824 {
825 ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
826
828 }
829
830 change->lsn = lsn;
831 change->txn = txn;
832
834 dlist_push_tail(&txn->changes, &change->node);
835 txn->nentries++;
836 txn->nentries_mem++;
837
838 /* update memory accounting information */
839 ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
841
842 /* process partial change */
843 ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
844
845 /* check the memory limits and evict something if needed */
847}
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, ReorderBufferTXN *txn, bool addition, Size sz)
#define rbtxn_is_aborted(txn)
dlist_head changes

References ReorderBufferChange::action, Assert, ReorderBufferTXN::changes, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, rbtxn_get_toptxn, RBTXN_HAS_STREAMABLE_CHANGE, rbtxn_is_aborted, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferCheckMemoryLimit(), ReorderBufferProcessPartialChange(), ReorderBufferReturnChange(), ReorderBufferTXNByXid(), ReorderBufferChange::txn, and ReorderBufferTXN::txn_flags.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddInvalidations(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snap,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 854 of file reorderbuffer.c.

858{
859 if (transactional)
860 {
861 MemoryContext oldcontext;
862 ReorderBufferChange *change;
863
865
866 /*
867 * We don't expect snapshots for transactional changes - we'll use the
868 * snapshot derived later during apply (unless the change gets
869 * skipped).
870 */
871 Assert(!snap);
872
873 oldcontext = MemoryContextSwitchTo(rb->context);
874
875 change = ReorderBufferGetChange(rb);
877 change->data.msg.prefix = pstrdup(prefix);
878 change->data.msg.message_size = message_size;
879 change->data.msg.message = palloc(message_size);
880 memcpy(change->data.msg.message, message, message_size);
881
882 ReorderBufferQueueChange(rb, xid, lsn, change, false);
883
884 MemoryContextSwitchTo(oldcontext);
885 }
886 else
887 {
888 ReorderBufferTXN *txn = NULL;
889 volatile Snapshot snapshot_now = snap;
890
891 /* Non-transactional changes require a valid snapshot. */
892 Assert(snapshot_now);
893
894 if (xid != InvalidTransactionId)
895 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
896
897 /* setup snapshot to allow catalog access */
898 SetupHistoricSnapshot(snapshot_now, NULL);
899 PG_TRY();
900 {
901 rb->message(rb, txn, lsn, false, prefix, message_size, message);
902
904 }
905 PG_CATCH();
906 {
908 PG_RE_THROW();
909 }
910 PG_END_TRY();
911 }
912}
#define PG_RE_THROW()
Definition: elog.h:412
#define PG_TRY(...)
Definition: elog.h:371
#define PG_END_TRY(...)
Definition: elog.h:396
#define PG_CATCH(...)
Definition: elog.h:381
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:1613
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:1597
struct ReorderBufferChange::@110::@113 msg
ReorderBufferMessageCB message

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBufferChange::message, ReorderBuffer::message, ReorderBufferChange::message_size, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, ReorderBufferChange::prefix, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), and TeardownHistoricSnapshot().

Referenced by logicalmsg_decode().

◆ ReorderBufferRememberPrepareInfo()

bool ReorderBufferRememberPrepareInfo ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  prepare_lsn,
XLogRecPtr  end_lsn,
TimestampTz  prepare_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2860 of file reorderbuffer.c.

2864{
2865 ReorderBufferTXN *txn;
2866
2867 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2868
2869 /* unknown transaction, nothing to do */
2870 if (txn == NULL)
2871 return false;
2872
2873 /*
2874 * Remember the prepare information to be later used by commit prepared in
2875 * case we skip doing prepare.
2876 */
2877 txn->final_lsn = prepare_lsn;
2878 txn->end_lsn = end_lsn;
2879 txn->xact_time.prepare_time = prepare_time;
2880 txn->origin_id = origin_id;
2881 txn->origin_lsn = origin_lsn;
2882
2883 /* Mark this transaction as a prepared transaction */
2886
2887 return true;
2888}

References Assert, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, RBTXN_IS_PREPARED, RBTXN_PREPARE_STATUS_MASK, ReorderBufferTXNByXid(), ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodePrepare().

◆ ReorderBufferReturnChange()

void ReorderBufferReturnChange ( ReorderBuffer rb,
ReorderBufferChange change,
bool  upd_mem 
)

Definition at line 503 of file reorderbuffer.c.

505{
506 /* update memory accounting info */
507 if (upd_mem)
508 ReorderBufferChangeMemoryUpdate(rb, change, NULL, false,
510
511 /* free contained data */
512 switch (change->action)
513 {
518 if (change->data.tp.newtuple)
519 {
521 change->data.tp.newtuple = NULL;
522 }
523
524 if (change->data.tp.oldtuple)
525 {
527 change->data.tp.oldtuple = NULL;
528 }
529 break;
531 if (change->data.msg.prefix != NULL)
532 pfree(change->data.msg.prefix);
533 change->data.msg.prefix = NULL;
534 if (change->data.msg.message != NULL)
535 pfree(change->data.msg.message);
536 change->data.msg.message = NULL;
537 break;
539 if (change->data.inval.invalidations)
540 pfree(change->data.inval.invalidations);
541 change->data.inval.invalidations = NULL;
542 break;
544 if (change->data.snapshot)
545 {
547 change->data.snapshot = NULL;
548 }
549 break;
550 /* no data in addition to the struct itself */
552 if (change->data.truncate.relids != NULL)
553 {
555 change->data.truncate.relids = NULL;
556 }
557 break;
562 break;
563 }
564
565 pfree(change);
566}
void pfree(void *pointer)
Definition: mcxt.c:1521
void ReorderBufferReturnTupleBuf(HeapTuple tuple)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
void ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)
struct ReorderBufferChange::@110::@112 truncate
struct ReorderBufferChange::@110::@111 tp

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::invalidations, ReorderBufferChange::message, ReorderBufferChange::msg, ReorderBufferChange::newtuple, ReorderBufferChange::oldtuple, pfree(), ReorderBufferChange::prefix, ReorderBufferChange::relids, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferFreeSnap(), ReorderBufferReturnRelids(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, ReorderBufferChange::tp, and ReorderBufferChange::truncate.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferProcessTXN(), ReorderBufferQueueChange(), ReorderBufferResetTXN(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferToastReset(), and ReorderBufferTruncateTXN().

◆ ReorderBufferReturnRelids()

void ReorderBufferReturnRelids ( ReorderBuffer rb,
Oid relids 
)

Definition at line 622 of file reorderbuffer.c.

623{
624 pfree(relids);
625}

References pfree().

Referenced by ReorderBufferReturnChange().

◆ ReorderBufferReturnTupleBuf()

void ReorderBufferReturnTupleBuf ( HeapTuple  tuple)

Definition at line 591 of file reorderbuffer.c.

592{
593 pfree(tuple);
594}

References pfree().

Referenced by ReorderBufferReturnChange().

◆ ReorderBufferSetBaseSnapshot()

void ReorderBufferSetBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 3271 of file reorderbuffer.c.

3273{
3274 ReorderBufferTXN *txn;
3275 bool is_new;
3276
3277 Assert(snap != NULL);
3278
3279 /*
3280 * Fetch the transaction to operate on. If we know it's a subtransaction,
3281 * operate on its top-level transaction instead.
3282 */
3283 txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
3284 if (rbtxn_is_known_subxact(txn))
3285 txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3286 NULL, InvalidXLogRecPtr, false);
3287 Assert(txn->base_snapshot == NULL);
3288
3289 txn->base_snapshot = snap;
3290 txn->base_snapshot_lsn = lsn;
3292
3294}
XLogRecPtr base_snapshot_lsn
dlist_node base_snapshot_node

References Assert, AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBufferTXN::base_snapshot_node, dlist_push_tail(), InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), ReorderBufferTXN::toplevel_xid, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by SnapBuildCommitTxn(), and SnapBuildProcessChange().

◆ ReorderBufferSetRestartPoint()

void ReorderBufferSetRestartPoint ( ReorderBuffer rb,
XLogRecPtr  ptr 
)

Definition at line 1068 of file reorderbuffer.c.

1069{
1071}

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

◆ ReorderBufferSkipPrepare()

void ReorderBufferSkipPrepare ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 2892 of file reorderbuffer.c.

2893{
2894 ReorderBufferTXN *txn;
2895
2896 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2897
2898 /* unknown transaction, nothing to do */
2899 if (txn == NULL)
2900 return;
2901
2902 /* txn must have been marked as a prepared transaction */
2905}

References Assert, InvalidXLogRecPtr, RBTXN_IS_PREPARED, RBTXN_PREPARE_STATUS_MASK, RBTXN_SKIPPED_PREPARE, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by DecodePrepare().

◆ ReorderBufferXidHasBaseSnapshot()

bool ReorderBufferXidHasBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 3599 of file reorderbuffer.c.

3600{
3601 ReorderBufferTXN *txn;
3602
3603 txn = ReorderBufferTXNByXid(rb, xid, false,
3604 NULL, InvalidXLogRecPtr, false);
3605
3606 /* transaction isn't known yet, ergo no snapshot */
3607 if (txn == NULL)
3608 return false;
3609
3610 /* a known subtxn? operate on top-level txn instead */
3611 if (rbtxn_is_known_subxact(txn))
3612 txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3613 NULL, InvalidXLogRecPtr, false);
3614
3615 return txn->base_snapshot != NULL;
3616}

References ReorderBufferTXN::base_snapshot, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), and ReorderBufferTXN::toplevel_xid.

Referenced by SnapBuildCommitTxn(), SnapBuildDistributeNewCatalogSnapshot(), and SnapBuildProcessChange().

◆ ReorderBufferXidHasCatalogChanges()

bool ReorderBufferXidHasCatalogChanges ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 3582 of file reorderbuffer.c.

3583{
3584 ReorderBufferTXN *txn;
3585
3586 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3587 false);
3588 if (txn == NULL)
3589 return false;
3590
3591 return rbtxn_has_catalog_changes(txn);
3592}

References InvalidXLogRecPtr, rbtxn_has_catalog_changes, and ReorderBufferTXNByXid().

Referenced by SnapBuildXidHasCatalogChanges().

◆ ReorderBufferXidSetCatalogChanges()

void ReorderBufferXidSetCatalogChanges ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3509 of file reorderbuffer.c.

3511{
3512 ReorderBufferTXN *txn;
3513
3514 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3515
3516 if (!rbtxn_has_catalog_changes(txn))
3517 {
3520 }
3521
3522 /*
3523 * Mark top-level transaction as having catalog changes too if one of its
3524 * children has so that the ReorderBufferBuildTupleCidHash can
3525 * conveniently check just top-level transaction and decide whether to
3526 * build the hash table or not.
3527 */
3528 if (rbtxn_is_subtxn(txn))
3529 {
3530 ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
3531
3532 if (!rbtxn_has_catalog_changes(toptxn))
3533 {
3536 }
3537 }
3538}
static void dclist_push_tail(dclist_head *head, dlist_node *node)
Definition: ilist.h:709
#define rbtxn_is_subtxn(txn)
dlist_node catchange_node

References ReorderBufferTXN::catchange_node, ReorderBuffer::catchange_txns, dclist_push_tail(), rbtxn_get_toptxn, RBTXN_HAS_CATALOG_CHANGES, rbtxn_has_catalog_changes, rbtxn_is_subtxn, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by SnapBuildProcessNewCid(), and xact_decode().

◆ StartupReorderBuffer()

void StartupReorderBuffer ( void  )

Definition at line 4778 of file reorderbuffer.c.

4779{
4780 DIR *logical_dir;
4781 struct dirent *logical_de;
4782
4783 logical_dir = AllocateDir(PG_REPLSLOT_DIR);
4784 while ((logical_de = ReadDir(logical_dir, PG_REPLSLOT_DIR)) != NULL)
4785 {
4786 if (strcmp(logical_de->d_name, ".") == 0 ||
4787 strcmp(logical_de->d_name, "..") == 0)
4788 continue;
4789
4790 /* if it cannot be a slot, skip the directory */
4791 if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
4792 continue;
4793
4794 /*
4795 * ok, has to be a surviving logical slot, iterate and delete
4796 * everything starting with xid-*
4797 */
4799 }
4800 FreeDir(logical_dir);
4801}
int FreeDir(DIR *dir)
Definition: fd.c:2983
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2865
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2931
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:252
#define PG_REPLSLOT_DIR
Definition: slot.h:21
Definition: dirent.c:26
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15

References AllocateDir(), dirent::d_name, DEBUG2, FreeDir(), PG_REPLSLOT_DIR, ReadDir(), ReorderBufferCleanupSerializedTXNs(), and ReplicationSlotValidateName().

Referenced by StartupXLOG().

Variable Documentation

◆ debug_logical_replication_streaming

PGDLLIMPORT int debug_logical_replication_streaming
extern

◆ logical_decoding_work_mem

PGDLLIMPORT int logical_decoding_work_mem
extern

Definition at line 213 of file reorderbuffer.c.

Referenced by ReorderBufferCheckMemoryLimit().