PostgreSQL Source Code  git master
reorderbuffer.h File Reference
#include "access/htup_details.h"
#include "lib/ilist.h"
#include "storage/sinval.h"
#include "utils/hsearch.h"
#include "utils/relcache.h"
#include "utils/snapshot.h"
#include "utils/timestamp.h"
Include dependency graph for reorderbuffer.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTupleBuf
 
struct  ReorderBufferChange
 
struct  ReorderBufferTXN
 
struct  ReorderBuffer
 

Macros

#define ReorderBufferTupleBufData(p)   ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))
 
#define RBTXN_HAS_CATALOG_CHANGES   0x0001
 
#define RBTXN_IS_SUBXACT   0x0002
 
#define RBTXN_IS_SERIALIZED   0x0004
 
#define rbtxn_has_catalog_changes(txn)
 
#define rbtxn_is_known_subxact(txn)
 
#define rbtxn_is_serialized(txn)
 

Typedefs

typedef struct ReorderBufferTupleBuf ReorderBufferTupleBuf
 
typedef struct ReorderBufferChange ReorderBufferChange
 
typedef struct ReorderBufferTXN ReorderBufferTXN
 
typedef struct ReorderBuffer ReorderBuffer
 
typedef void(* ReorderBufferApplyChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
typedef void(* ReorderBufferApplyTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
typedef void(* ReorderBufferBeginCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
typedef void(* ReorderBufferCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
typedef void(* ReorderBufferMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 

Enumerations

enum  ReorderBufferChangeType {
  REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_MESSAGE,
  REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
  REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_TRUNCATE
}
 

Functions

ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *)
 
ReorderBufferTupleBufReorderBufferGetTupleBuf (ReorderBuffer *, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (ReorderBuffer *, ReorderBufferTupleBuf *tuple)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *)
 
void ReorderBufferReturnChange (ReorderBuffer *, ReorderBufferChange *)
 
OidReorderBufferGetRelids (ReorderBuffer *, int nrelids)
 
void ReorderBufferReturnRelids (ReorderBuffer *, Oid *relids)
 
void ReorderBufferQueueChange (ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *)
 
void ReorderBufferQueueMessage (ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
void ReorderBufferCommit (ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferAssignChild (ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
void ReorderBufferAbort (ReorderBuffer *, TransactionId, XLogRecPtr lsn)
 
void ReorderBufferAbortOld (ReorderBuffer *, TransactionId xid)
 
void ReorderBufferForget (ReorderBuffer *, TransactionId, XLogRecPtr lsn)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap)
 
void ReorderBufferAddSnapshot (ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *, TransactionId, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *, TransactionId, XLogRecPtr lsn, RelFileNode node, ItemPointerData pt, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *, TransactionId, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *, TransactionId xid, XLogRecPtr lsn)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *, TransactionId xid)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *)
 
TransactionId ReorderBufferGetOldestXmin (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *, XLogRecPtr ptr)
 
void StartupReorderBuffer (void)
 

Variables

PGDLLIMPORT int logical_decoding_work_mem
 

Macro Definition Documentation

◆ RBTXN_HAS_CATALOG_CHANGES

#define RBTXN_HAS_CATALOG_CHANGES   0x0001

Definition at line 162 of file reorderbuffer.h.

Referenced by ReorderBufferXidSetCatalogChanges().

◆ rbtxn_has_catalog_changes

#define rbtxn_has_catalog_changes (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_HAS_CATALOG_CHANGES) != 0 \
)
#define RBTXN_HAS_CATALOG_CHANGES

Definition at line 167 of file reorderbuffer.h.

Referenced by ReorderBufferBuildTupleCidHash(), and ReorderBufferXidHasCatalogChanges().

◆ rbtxn_is_known_subxact

#define rbtxn_is_known_subxact (   txn)

◆ RBTXN_IS_SERIALIZED

#define RBTXN_IS_SERIALIZED   0x0004

Definition at line 164 of file reorderbuffer.h.

Referenced by ReorderBufferSerializeTXN().

◆ rbtxn_is_serialized

#define rbtxn_is_serialized (   txn)
Value:
( \
((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \
)
#define RBTXN_IS_SERIALIZED

Definition at line 179 of file reorderbuffer.h.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferIterTXNInit(), and ReorderBufferSerializeTXN().

◆ RBTXN_IS_SUBXACT

#define RBTXN_IS_SUBXACT   0x0002

Definition at line 163 of file reorderbuffer.h.

Referenced by ReorderBufferAssignChild().

◆ ReorderBufferTupleBufData

#define ReorderBufferTupleBufData (   p)    ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))

Typedef Documentation

◆ ReorderBuffer

typedef struct ReorderBuffer ReorderBuffer

Definition at line 316 of file reorderbuffer.h.

◆ ReorderBufferApplyChangeCB

typedef void(* ReorderBufferApplyChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)

Definition at line 319 of file reorderbuffer.h.

◆ ReorderBufferApplyTruncateCB

typedef void(* ReorderBufferApplyTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)

Definition at line 325 of file reorderbuffer.h.

◆ ReorderBufferBeginCB

typedef void(* ReorderBufferBeginCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)

Definition at line 332 of file reorderbuffer.h.

◆ ReorderBufferChange

◆ ReorderBufferCommitCB

typedef void(* ReorderBufferCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

Definition at line 336 of file reorderbuffer.h.

◆ ReorderBufferMessageCB

typedef void(* ReorderBufferMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)

Definition at line 341 of file reorderbuffer.h.

◆ ReorderBufferTupleBuf

◆ ReorderBufferTXN

Enumeration Type Documentation

◆ ReorderBufferChangeType

Enumerator
REORDER_BUFFER_CHANGE_INSERT 
REORDER_BUFFER_CHANGE_UPDATE 
REORDER_BUFFER_CHANGE_DELETE 
REORDER_BUFFER_CHANGE_MESSAGE 
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT 
REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID 
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM 
REORDER_BUFFER_CHANGE_TRUNCATE 

Definition at line 54 of file reorderbuffer.h.

Function Documentation

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn 
)

Definition at line 1924 of file reorderbuffer.c.

References ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferCleanupTXN(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeAbort().

1925 {
1926  ReorderBufferTXN *txn;
1927 
1928  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1929  false);
1930 
1931  /* unknown, nothing to remove */
1932  if (txn == NULL)
1933  return;
1934 
1935  /* cosmetic... */
1936  txn->final_lsn = lsn;
1937 
1938  /* remove potential on-disk data, and deallocate */
1939  ReorderBufferCleanupTXN(rb, txn);
1940 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer ,
TransactionId  xid 
)

Definition at line 1950 of file reorderbuffer.c.

References dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, elog, ReorderBufferCleanupTXN(), ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::xid.

Referenced by DecodeStandbyOp().

1951 {
1952  dlist_mutable_iter it;
1953 
1954  /*
1955  * Iterate through all (potential) toplevel TXNs and abort all that are
1956  * older than what possibly can be running. Once we've found the first
1957  * that is alive we stop, there might be some that acquired an xid earlier
1958  * but started writing later, but it's unlikely and they will be cleaned
1959  * up in a later call to this function.
1960  */
1961  dlist_foreach_modify(it, &rb->toplevel_by_lsn)
1962  {
1963  ReorderBufferTXN *txn;
1964 
1965  txn = dlist_container(ReorderBufferTXN, node, it.cur);
1966 
1967  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1968  {
1969  elog(DEBUG2, "aborting old transaction %u", txn->xid);
1970 
1971  /* remove potential on-disk data, and deallocate this tx */
1972  ReorderBufferCleanupTXN(rb, txn);
1973  }
1974  else
1975  return;
1976  }
1977 }
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define DEBUG2
Definition: elog.h:24
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
#define elog(elevel,...)
Definition: elog.h:214

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 2211 of file reorderbuffer.c.

References Assert, ReorderBuffer::context, elog, ERROR, ReorderBufferTXN::invalidations, MemoryContextAlloc(), ReorderBufferTXN::ninvalidations, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

2214 {
2215  ReorderBufferTXN *txn;
2216 
2217  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2218 
2219  if (txn->ninvalidations != 0)
2220  elog(ERROR, "only ever add one set of invalidations");
2221 
2222  Assert(nmsgs > 0);
2223 
2224  txn->ninvalidations = nmsgs;
2226  MemoryContextAlloc(rb->context,
2227  sizeof(SharedInvalidationMessage) * nmsgs);
2228  memcpy(txn->invalidations, msgs,
2229  sizeof(SharedInvalidationMessage) * nmsgs);
2230 }
#define ERROR
Definition: elog.h:43
#define Assert(condition)
Definition: c.h:738
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
#define elog(elevel,...)
Definition: elog.h:214

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
CommandId  cid 
)

Definition at line 2127 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::command_id, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, ReorderBufferGetChange(), and ReorderBufferQueueChange().

Referenced by SnapBuildProcessNewCid().

2129 {
2131 
2132  change->data.command_id = cid;
2134 
2135  ReorderBufferQueueChange(rb, xid, lsn, change);
2136 }
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
union ReorderBufferChange::@99 data
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
RelFileNode  node,
ItemPointerData  pt,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 2182 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessNewCid().

2186 {
2188  ReorderBufferTXN *txn;
2189 
2190  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2191 
2192  change->data.tuplecid.node = node;
2193  change->data.tuplecid.tid = tid;
2194  change->data.tuplecid.cmin = cmin;
2195  change->data.tuplecid.cmax = cmax;
2196  change->data.tuplecid.combocid = combocid;
2197  change->lsn = lsn;
2198  change->txn = txn;
2200 
2201  dlist_push_tail(&txn->tuplecids, &change->node);
2202  txn->ntuplecids++;
2203 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:86
struct ReorderBufferChange::@99::@103 tuplecid
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
union ReorderBufferChange::@99 data
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
dlist_head tuplecids

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
struct SnapshotData snap 
)

◆ ReorderBufferAllocate()

ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 272 of file reorderbuffer.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, ReorderBufferCleanupSerializedTXNs(), ReorderBuffer::size, SLAB_DEFAULT_BLOCK_SIZE, SLAB_LARGE_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBuffer::toplevel_by_lsn, ReorderBuffer::tup_context, ReorderBuffer::txn_context, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

273 {
274  ReorderBuffer *buffer;
275  HASHCTL hash_ctl;
276  MemoryContext new_ctx;
277 
278  Assert(MyReplicationSlot != NULL);
279 
280  /* allocate memory in own context, to have better accountability */
282  "ReorderBuffer",
284 
285  buffer =
286  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
287 
288  memset(&hash_ctl, 0, sizeof(hash_ctl));
289 
290  buffer->context = new_ctx;
291 
292  buffer->change_context = SlabContextCreate(new_ctx,
293  "Change",
295  sizeof(ReorderBufferChange));
296 
297  buffer->txn_context = SlabContextCreate(new_ctx,
298  "TXN",
300  sizeof(ReorderBufferTXN));
301 
302  buffer->tup_context = GenerationContextCreate(new_ctx,
303  "Tuples",
305 
306  hash_ctl.keysize = sizeof(TransactionId);
307  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
308  hash_ctl.hcxt = buffer->context;
309 
310  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
312 
314  buffer->by_txn_last_txn = NULL;
315 
316  buffer->outbuf = NULL;
317  buffer->outbufsize = 0;
318  buffer->size = 0;
319 
320  buffer->spillCount = 0;
321  buffer->spillTxns = 0;
322  buffer->spillBytes = 0;
323 
325 
326  dlist_init(&buffer->toplevel_by_lsn);
328 
329  /*
330  * Ensure there's no stale data from prior uses of this slot, in case some
331  * prior exit avoided calling ReorderBufferFree. Failure to do this can
332  * produce duplicated txns, and it's very cheap if there's nothing there.
333  */
335 
336  return buffer;
337 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define AllocSetContextCreate
Definition: memutils.h:170
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
uint32 TransactionId
Definition: c.h:513
MemoryContext hcxt
Definition: hsearch.h:78
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
Size entrysize
Definition: hsearch.h:73
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:174
ReplicationSlotPersistentData data
Definition: slot.h:140
MemoryContext change_context
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:222
dlist_head txns_by_base_snapshot_lsn
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
#define InvalidTransactionId
Definition: transam.h:31
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size blockSize)
Definition: generation.c:196
MemoryContext context
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:318
ReorderBufferTXN * by_txn_last_txn
dlist_head toplevel_by_lsn
Size keysize
Definition: hsearch.h:72
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:738
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:221
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
#define NameStr(name)
Definition: c.h:615
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
MemoryContext tup_context
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext txn_context

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer ,
TransactionId  ,
TransactionId  ,
XLogRecPtr  commit_lsn 
)

Definition at line 830 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SUBXACT, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, ReorderBufferTXN::toplevel_xid, ReorderBufferTXNByIdEnt::txn, ReorderBufferTXN::txn_flags, and ReorderBufferTXNByIdEnt::xid.

Referenced by DecodeXactOp(), and ReorderBufferCommitChild().

832 {
833  ReorderBufferTXN *txn;
834  ReorderBufferTXN *subtxn;
835  bool new_top;
836  bool new_sub;
837 
838  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
839  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
840 
841  if (!new_sub)
842  {
843  if (rbtxn_is_known_subxact(subtxn))
844  {
845  /* already associated, nothing to do */
846  return;
847  }
848  else
849  {
850  /*
851  * We already saw this transaction, but initially added it to the
852  * list of top-level txns. Now that we know it's not top-level,
853  * remove it from there.
854  */
855  dlist_delete(&subtxn->node);
856  }
857  }
858 
859  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
860  subtxn->toplevel_xid = xid;
861  Assert(subtxn->nsubtxns == 0);
862 
863  /* add to subtransaction list */
864  dlist_push_tail(&txn->subtxns, &subtxn->node);
865  txn->nsubtxns++;
866 
867  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
869 
870  /* Verify LSN-ordering invariant */
871  AssertTXNLsnOrder(rb);
872 }
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
#define RBTXN_IS_SUBXACT
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:738
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_known_subxact(txn)
TransactionId toplevel_xid

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 1502 of file reorderbuffer.c.

References AbortCurrentTransaction(), ReorderBufferChange::action, ReorderBuffer::apply_change, ReorderBuffer::apply_truncate, Assert, ReorderBufferTXN::base_snapshot, ReorderBuffer::begin, BeginInternalSubTransaction(), ReorderBufferChange::command_id, ReorderBuffer::commit, ReorderBufferTXN::commit_time, SnapshotData::copied, SnapshotData::curcid, ReorderBufferChange::data, dlist_delete(), elog, ReorderBufferTXN::end_lsn, ERROR, ReorderBufferTXN::final_lsn, FirstCommandId, GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), i, InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, ReorderBuffer::message, ReorderBufferChange::msg, ReorderBufferTXN::ninvalidations, ReorderBufferChange::node, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBuffer::output_rewrites, palloc0(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelationIsValid, RelidByRelfilenode(), relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferReturnChange(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTXNByXid(), RollbackAndReleaseCurrentSubTransaction(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, StartTransactionCommand(), TeardownHistoricSnapshot(), ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

1506 {
1507  ReorderBufferTXN *txn;
1508  volatile Snapshot snapshot_now;
1509  volatile CommandId command_id = FirstCommandId;
1510  bool using_subtxn;
1511  ReorderBufferIterTXNState *volatile iterstate = NULL;
1512 
1513  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1514  false);
1515 
1516  /* unknown transaction, nothing to replay */
1517  if (txn == NULL)
1518  return;
1519 
1520  txn->final_lsn = commit_lsn;
1521  txn->end_lsn = end_lsn;
1522  txn->commit_time = commit_time;
1523  txn->origin_id = origin_id;
1524  txn->origin_lsn = origin_lsn;
1525 
1526  /*
1527  * If this transaction has no snapshot, it didn't make any changes to the
1528  * database, so there's nothing to decode. Note that
1529  * ReorderBufferCommitChild will have transferred any snapshots from
1530  * subtransactions if there were any.
1531  */
1532  if (txn->base_snapshot == NULL)
1533  {
1534  Assert(txn->ninvalidations == 0);
1535  ReorderBufferCleanupTXN(rb, txn);
1536  return;
1537  }
1538 
1539  snapshot_now = txn->base_snapshot;
1540 
1541  /* build data to be able to lookup the CommandIds of catalog tuples */
1543 
1544  /* setup the initial snapshot */
1545  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1546 
1547  /*
1548  * Decoding needs access to syscaches et al., which in turn use
1549  * heavyweight locks and such. Thus we need to have enough state around to
1550  * keep track of those. The easiest way is to simply use a transaction
1551  * internally. That also allows us to easily enforce that nothing writes
1552  * to the database by checking for xid assignments.
1553  *
1554  * When we're called via the SQL SRF there's already a transaction
1555  * started, so start an explicit subtransaction there.
1556  */
1557  using_subtxn = IsTransactionOrTransactionBlock();
1558 
1559  PG_TRY();
1560  {
1561  ReorderBufferChange *change;
1562  ReorderBufferChange *specinsert = NULL;
1563 
1564  if (using_subtxn)
1565  BeginInternalSubTransaction("replay");
1566  else
1568 
1569  rb->begin(rb, txn);
1570 
1571  ReorderBufferIterTXNInit(rb, txn, &iterstate);
1572  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1573  {
1574  Relation relation = NULL;
1575  Oid reloid;
1576 
1577  switch (change->action)
1578  {
1580 
1581  /*
1582  * Confirmation for speculative insertion arrived. Simply
1583  * use as a normal record. It'll be cleaned up at the end
1584  * of INSERT processing.
1585  */
1586  if (specinsert == NULL)
1587  elog(ERROR, "invalid ordering of speculative insertion changes");
1588  Assert(specinsert->data.tp.oldtuple == NULL);
1589  change = specinsert;
1591 
1592  /* intentionally fall through */
1596  Assert(snapshot_now);
1597 
1598  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1599  change->data.tp.relnode.relNode);
1600 
1601  /*
1602  * Mapped catalog tuple without data, emitted while
1603  * catalog table was in the process of being rewritten. We
1604  * can fail to look up the relfilenode, because the
1605  * relmapper has no "historic" view, in contrast to normal
1606  * the normal catalog during decoding. Thus repeated
1607  * rewrites can cause a lookup failure. That's OK because
1608  * we do not decode catalog changes anyway. Normally such
1609  * tuples would be skipped over below, but we can't
1610  * identify whether the table should be logically logged
1611  * without mapping the relfilenode to the oid.
1612  */
1613  if (reloid == InvalidOid &&
1614  change->data.tp.newtuple == NULL &&
1615  change->data.tp.oldtuple == NULL)
1616  goto change_done;
1617  else if (reloid == InvalidOid)
1618  elog(ERROR, "could not map filenode \"%s\" to relation OID",
1619  relpathperm(change->data.tp.relnode,
1620  MAIN_FORKNUM));
1621 
1622  relation = RelationIdGetRelation(reloid);
1623 
1624  if (!RelationIsValid(relation))
1625  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1626  reloid,
1627  relpathperm(change->data.tp.relnode,
1628  MAIN_FORKNUM));
1629 
1630  if (!RelationIsLogicallyLogged(relation))
1631  goto change_done;
1632 
1633  /*
1634  * Ignore temporary heaps created during DDL unless the
1635  * plugin has asked for them.
1636  */
1637  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
1638  goto change_done;
1639 
1640  /*
1641  * For now ignore sequence changes entirely. Most of the
1642  * time they don't log changes using records we
1643  * understand, so it doesn't make sense to handle the few
1644  * cases we do.
1645  */
1646  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1647  goto change_done;
1648 
1649  /* user-triggered change */
1650  if (!IsToastRelation(relation))
1651  {
1652  ReorderBufferToastReplace(rb, txn, relation, change);
1653  rb->apply_change(rb, txn, relation, change);
1654 
1655  /*
1656  * Only clear reassembled toast chunks if we're sure
1657  * they're not required anymore. The creator of the
1658  * tuple tells us.
1659  */
1660  if (change->data.tp.clear_toast_afterwards)
1661  ReorderBufferToastReset(rb, txn);
1662  }
1663  /* we're not interested in toast deletions */
1664  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1665  {
1666  /*
1667  * Need to reassemble the full toasted Datum in
1668  * memory, to ensure the chunks don't get reused till
1669  * we're done remove it from the list of this
1670  * transaction's changes. Otherwise it will get
1671  * freed/reused while restoring spooled data from
1672  * disk.
1673  */
1674  Assert(change->data.tp.newtuple != NULL);
1675 
1676  dlist_delete(&change->node);
1677  ReorderBufferToastAppendChunk(rb, txn, relation,
1678  change);
1679  }
1680 
1681  change_done:
1682 
1683  /*
1684  * Either speculative insertion was confirmed, or it was
1685  * unsuccessful and the record isn't needed anymore.
1686  */
1687  if (specinsert != NULL)
1688  {
1689  ReorderBufferReturnChange(rb, specinsert);
1690  specinsert = NULL;
1691  }
1692 
1693  if (relation != NULL)
1694  {
1695  RelationClose(relation);
1696  relation = NULL;
1697  }
1698  break;
1699 
1701 
1702  /*
1703  * Speculative insertions are dealt with by delaying the
1704  * processing of the insert until the confirmation record
1705  * arrives. For that we simply unlink the record from the
1706  * chain, so it does not get freed/reused while restoring
1707  * spooled data from disk.
1708  *
1709  * This is safe in the face of concurrent catalog changes
1710  * because the relevant relation can't be changed between
1711  * speculative insertion and confirmation due to
1712  * CheckTableNotInUse() and locking.
1713  */
1714 
1715  /* clear out a pending (and thus failed) speculation */
1716  if (specinsert != NULL)
1717  {
1718  ReorderBufferReturnChange(rb, specinsert);
1719  specinsert = NULL;
1720  }
1721 
1722  /* and memorize the pending insertion */
1723  dlist_delete(&change->node);
1724  specinsert = change;
1725  break;
1726 
1728  {
1729  int i;
1730  int nrelids = change->data.truncate.nrelids;
1731  int nrelations = 0;
1732  Relation *relations;
1733 
1734  relations = palloc0(nrelids * sizeof(Relation));
1735  for (i = 0; i < nrelids; i++)
1736  {
1737  Oid relid = change->data.truncate.relids[i];
1738  Relation relation;
1739 
1740  relation = RelationIdGetRelation(relid);
1741 
1742  if (!RelationIsValid(relation))
1743  elog(ERROR, "could not open relation with OID %u", relid);
1744 
1745  if (!RelationIsLogicallyLogged(relation))
1746  continue;
1747 
1748  relations[nrelations++] = relation;
1749  }
1750 
1751  rb->apply_truncate(rb, txn, nrelations, relations, change);
1752 
1753  for (i = 0; i < nrelations; i++)
1754  RelationClose(relations[i]);
1755 
1756  break;
1757  }
1758 
1760  rb->message(rb, txn, change->lsn, true,
1761  change->data.msg.prefix,
1762  change->data.msg.message_size,
1763  change->data.msg.message);
1764  break;
1765 
1767  /* get rid of the old */
1768  TeardownHistoricSnapshot(false);
1769 
1770  if (snapshot_now->copied)
1771  {
1772  ReorderBufferFreeSnap(rb, snapshot_now);
1773  snapshot_now =
1774  ReorderBufferCopySnap(rb, change->data.snapshot,
1775  txn, command_id);
1776  }
1777 
1778  /*
1779  * Restored from disk, need to be careful not to double
1780  * free. We could introduce refcounting for that, but for
1781  * now this seems infrequent enough not to care.
1782  */
1783  else if (change->data.snapshot->copied)
1784  {
1785  snapshot_now =
1786  ReorderBufferCopySnap(rb, change->data.snapshot,
1787  txn, command_id);
1788  }
1789  else
1790  {
1791  snapshot_now = change->data.snapshot;
1792  }
1793 
1794 
1795  /* and continue with the new one */
1796  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1797  break;
1798 
1800  Assert(change->data.command_id != InvalidCommandId);
1801 
1802  if (command_id < change->data.command_id)
1803  {
1804  command_id = change->data.command_id;
1805 
1806  if (!snapshot_now->copied)
1807  {
1808  /* we don't use the global one anymore */
1809  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1810  txn, command_id);
1811  }
1812 
1813  snapshot_now->curcid = command_id;
1814 
1815  TeardownHistoricSnapshot(false);
1816  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1817 
1818  /*
1819  * Every time the CommandId is incremented, we could
1820  * see new catalog contents, so execute all
1821  * invalidations.
1822  */
1824  }
1825 
1826  break;
1827 
1829  elog(ERROR, "tuplecid value in changequeue");
1830  break;
1831  }
1832  }
1833 
1834  /*
1835  * There's a speculative insertion remaining, just clean in up, it
1836  * can't have been successful, otherwise we'd gotten a confirmation
1837  * record.
1838  */
1839  if (specinsert)
1840  {
1841  ReorderBufferReturnChange(rb, specinsert);
1842  specinsert = NULL;
1843  }
1844 
1845  /* clean up the iterator */
1846  ReorderBufferIterTXNFinish(rb, iterstate);
1847  iterstate = NULL;
1848 
1849  /* call commit callback */
1850  rb->commit(rb, txn, commit_lsn);
1851 
1852  /* this is just a sanity check against bad output plugin behaviour */
1854  elog(ERROR, "output plugin used XID %u",
1856 
1857  /* cleanup */
1858  TeardownHistoricSnapshot(false);
1859 
1860  /*
1861  * Aborting the current (sub-)transaction as a whole has the right
1862  * semantics. We want all locks acquired in here to be released, not
1863  * reassigned to the parent and we do not want any database access
1864  * have persistent effects.
1865  */
1867 
1868  /* make sure there's no cache pollution */
1870 
1871  if (using_subtxn)
1873 
1874  if (snapshot_now->copied)
1875  ReorderBufferFreeSnap(rb, snapshot_now);
1876 
1877  /* remove potential on-disk data, and deallocate */
1878  ReorderBufferCleanupTXN(rb, txn);
1879  }
1880  PG_CATCH();
1881  {
1882  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1883  if (iterstate)
1884  ReorderBufferIterTXNFinish(rb, iterstate);
1885 
1887 
1888  /*
1889  * Force cache invalidation to happen outside of a valid transaction
1890  * to prevent catalog access as we just caught an error.
1891  */
1893 
1894  /* make sure there's no cache pollution */
1896 
1897  if (using_subtxn)
1899 
1900  if (snapshot_now->copied)
1901  ReorderBufferFreeSnap(rb, snapshot_now);
1902 
1903  /* remove potential on-disk data, and deallocate */
1904  ReorderBufferCleanupTXN(rb, txn);
1905 
1906  PG_RE_THROW();
1907  }
1908  PG_END_TRY();
1909 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
uint32 CommandId
Definition: c.h:527
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
TimestampTz commit_time
void AbortCurrentTransaction(void)
Definition: xact.c:3181
bool IsToastRelation(Relation relation)
Definition: catalog.c:140
#define relpathperm(rnode, forknum)
Definition: relpath.h:83
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
struct ReorderBufferChange::@99::@101 truncate
bool copied
Definition: snapshot.h:185
struct ReorderBufferChange::@99::@102 msg
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4672
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2051
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:635
Form_pg_class rd_rel
Definition: rel.h:109
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
XLogRecPtr origin_lsn
#define FirstCommandId
Definition: c.h:529
#define ERROR
Definition: elog.h:43
#define RelationIsValid(relation)
Definition: rel.h:429
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:423
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4483
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:440
#define InvalidTransactionId
Definition: transam.h:31
XLogRecPtr final_lsn
void RelationClose(Relation relation)
Definition: relcache.c:2102
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
void * palloc0(Size size)
Definition: mcxt.c:980
#define InvalidCommandId
Definition: c.h:530
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
union ReorderBufferChange::@99 data
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
#define InvalidOid
Definition: postgres_ext.h:36
CommandId curcid
Definition: snapshot.h:187
static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define PG_CATCH()
Definition: elog.h:305
struct ReorderBufferChange::@99::@100 tp
#define Assert(condition)
Definition: c.h:738
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
XLogRecPtr end_lsn
void StartTransactionCommand(void)
Definition: xact.c:2816
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4378
#define PG_RE_THROW()
Definition: elog.h:336
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2035
#define elog(elevel,...)
Definition: elog.h:214
int i
#define PG_TRY()
Definition: elog.h:295
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:1996
#define PG_END_TRY()
Definition: elog.h:320

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer ,
TransactionId  ,
TransactionId  ,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 947 of file reorderbuffer.c.

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

950 {
951  ReorderBufferTXN *subtxn;
952 
953  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
954  InvalidXLogRecPtr, false);
955 
956  /*
957  * No need to do anything if that subtxn didn't contain any changes
958  */
959  if (!subtxn)
960  return;
961 
962  subtxn->final_lsn = commit_lsn;
963  subtxn->end_lsn = end_lsn;
964 
965  /*
966  * Assign this subxact as a child of the toplevel xact (no-op if already
967  * done.)
968  */
970 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr final_lsn
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
XLogRecPtr end_lsn
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn 
)

Definition at line 1993 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

1994 {
1995  ReorderBufferTXN *txn;
1996 
1997  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1998  false);
1999 
2000  /* unknown, nothing to forget */
2001  if (txn == NULL)
2002  return;
2003 
2004  /* cosmetic... */
2005  txn->final_lsn = lsn;
2006 
2007  /*
2008  * Process cache invalidation messages if there are any. Even if we're not
2009  * interested in the transaction's contents, it could have manipulated the
2010  * catalog and we need to update the caches according to that.
2011  */
2012  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2014  txn->invalidations);
2015  else
2016  Assert(txn->ninvalidations == 0);
2017 
2018  /* remove potential on-disk data, and deallocate */
2019  ReorderBufferCleanupTXN(rb, txn);
2020 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:738
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer )

Definition at line 343 of file reorderbuffer.c.

References ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

344 {
345  MemoryContext context = rb->context;
346 
347  /*
348  * We free separately allocated data by entirely scrapping reorderbuffer's
349  * memory context.
350  */
351  MemoryContextDelete(context);
352 
353  /* Free disk space used by unconsumed reorder buffers */
355 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
ReplicationSlotPersistentData data
Definition: slot.h:140
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NameStr(name)
Definition: c.h:615
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)

◆ ReorderBufferGetChange()

ReorderBufferChange* ReorderBufferGetChange ( ReorderBuffer )

Definition at line 411 of file reorderbuffer.c.

References ReorderBuffer::change_context, and MemoryContextAlloc().

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddNewTupleCids(), ReorderBufferAddSnapshot(), ReorderBufferQueueMessage(), and ReorderBufferRestoreChange().

412 {
413  ReorderBufferChange *change;
414 
415  change = (ReorderBufferChange *)
416  MemoryContextAlloc(rb->change_context, sizeof(ReorderBufferChange));
417 
418  memset(change, 0, sizeof(ReorderBufferChange));
419  return change;
420 }
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer )

Definition at line 775 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessRunningXacts().

776 {
777  ReorderBufferTXN *txn;
778 
779  AssertTXNLsnOrder(rb);
780 
781  if (dlist_is_empty(&rb->toplevel_by_lsn))
782  return NULL;
783 
784  txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
785 
788  return txn;
789 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:738
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define rbtxn_is_known_subxact(txn)

◆ ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBuffer rb)

Definition at line 803 of file reorderbuffer.c.

References AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, dlist_head_element, dlist_is_empty(), InvalidTransactionId, ReorderBufferTXNByIdEnt::txn, ReorderBuffer::txns_by_base_snapshot_lsn, and SnapshotData::xmin.

Referenced by SnapBuildProcessRunningXacts().

804 {
805  ReorderBufferTXN *txn;
806 
807  AssertTXNLsnOrder(rb);
808 
810  return InvalidTransactionId;
811 
812  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
814  return txn->base_snapshot->xmin;
815 }
Snapshot base_snapshot
dlist_head txns_by_base_snapshot_lsn
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: snapshot.h:157
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289

◆ ReorderBufferGetRelids()

Oid* ReorderBufferGetRelids ( ReorderBuffer ,
int  nrelids 
)

Definition at line 523 of file reorderbuffer.c.

References ReorderBuffer::context, and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

524 {
525  Oid *relids;
526  Size alloc_len;
527 
528  alloc_len = sizeof(Oid) * nrelids;
529 
530  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
531 
532  return relids;
533 }
unsigned int Oid
Definition: postgres_ext.h:31
size_t Size
Definition: c.h:466
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796

◆ ReorderBufferGetTupleBuf()

ReorderBufferTupleBuf* ReorderBufferGetTupleBuf ( ReorderBuffer ,
Size  tuple_len 
)

Definition at line 487 of file reorderbuffer.c.

References ReorderBufferTupleBuf::alloc_tuple_size, MemoryContextAlloc(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, HeapTupleData::t_data, ReorderBuffer::tup_context, and ReorderBufferTupleBuf::tuple.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

488 {
489  ReorderBufferTupleBuf *tuple;
490  Size alloc_len;
491 
492  alloc_len = tuple_len + SizeofHeapTupleHeader;
493 
494  tuple = (ReorderBufferTupleBuf *)
495  MemoryContextAlloc(rb->tup_context,
496  sizeof(ReorderBufferTupleBuf) +
497  MAXIMUM_ALIGNOF + alloc_len);
498  tuple->alloc_tuple_size = alloc_len;
499  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
500 
501  return tuple;
502 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
HeapTupleHeader t_data
Definition: htup.h:68
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:38
HeapTupleData tuple
Definition: reorderbuffer.h:29
size_t Size
Definition: c.h:466
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer ,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 2029 of file reorderbuffer.c.

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by DecodeStandbyOp(), and ReorderBufferForget().

2031 {
2032  bool use_subtxn = IsTransactionOrTransactionBlock();
2033  int i;
2034 
2035  if (use_subtxn)
2036  BeginInternalSubTransaction("replay");
2037 
2038  /*
2039  * Force invalidations to happen outside of a valid transaction - that way
2040  * entries will just be marked as invalid without accessing the catalog.
2041  * That's advantageous because we don't need to setup the full state
2042  * necessary for catalog access.
2043  */
2044  if (use_subtxn)
2046 
2047  for (i = 0; i < ninvalidations; i++)
2048  LocalExecuteInvalidationMessage(&invalidations[i]);
2049 
2050  if (use_subtxn)
2052 }
void AbortCurrentTransaction(void)
Definition: xact.c:3181
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4672
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4483
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4378
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:556
int i

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer ,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2065 of file reorderbuffer.c.

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by DecodeHeap2Op(), DecodeHeapOp(), DecodeLogicalMsgOp(), DecodeStandbyOp(), DecodeXactOp(), DecodeXLogOp(), and LogicalDecodingProcessRecord().

2066 {
2067  /* many records won't have an xid assigned, centralize check here */
2068  if (xid != InvalidTransactionId)
2069  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2070 }
#define InvalidTransactionId
Definition: transam.h:31
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
ReorderBufferChange  
)

Definition at line 634 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBufferChangeMemoryUpdate(), ReorderBufferCheckMemoryLimit(), ReorderBufferTXNByXid(), ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

636 {
637  ReorderBufferTXN *txn;
638 
639  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
640 
641  change->lsn = lsn;
642  change->txn = txn;
643 
644  Assert(InvalidXLogRecPtr != lsn);
645  dlist_push_tail(&txn->changes, &change->node);
646  txn->nentries++;
647  txn->nentries_mem++;
648 
649  /* update memory accounting information */
650  ReorderBufferChangeMemoryUpdate(rb, change, true);
651 
652  /* check the memory limits and evict something if needed */
654 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
dlist_head changes
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:738
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer ,
TransactionId  ,
Snapshot  snapshot,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 660 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), TeardownHistoricSnapshot(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeLogicalMsgOp().

664 {
665  if (transactional)
666  {
667  MemoryContext oldcontext;
668  ReorderBufferChange *change;
669 
671 
672  oldcontext = MemoryContextSwitchTo(rb->context);
673 
674  change = ReorderBufferGetChange(rb);
676  change->data.msg.prefix = pstrdup(prefix);
677  change->data.msg.message_size = message_size;
678  change->data.msg.message = palloc(message_size);
679  memcpy(change->data.msg.message, message, message_size);
680 
681  ReorderBufferQueueChange(rb, xid, lsn, change);
682 
683  MemoryContextSwitchTo(oldcontext);
684  }
685  else
686  {
687  ReorderBufferTXN *txn = NULL;
688  volatile Snapshot snapshot_now = snapshot;
689 
690  if (xid != InvalidTransactionId)
691  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
692 
693  /* setup snapshot to allow catalog access */
694  SetupHistoricSnapshot(snapshot_now, NULL);
695  PG_TRY();
696  {
697  rb->message(rb, txn, lsn, false, prefix, message_size, message);
698 
700  }
701  PG_CATCH();
702  {
704  PG_RE_THROW();
705  }
706  PG_END_TRY();
707  }
708 }
struct ReorderBufferChange::@99::@102 msg
char * pstrdup(const char *in)
Definition: mcxt.c:1186
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2051
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
#define InvalidTransactionId
Definition: transam.h:31
union ReorderBufferChange::@99 data
#define PG_CATCH()
Definition: elog.h:305
#define Assert(condition)
Definition: c.h:738
#define PG_RE_THROW()
Definition: elog.h:336
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:949
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2035
#define PG_TRY()
Definition: elog.h:295
#define PG_END_TRY()
Definition: elog.h:320

◆ ReorderBufferReturnChange()

void ReorderBufferReturnChange ( ReorderBuffer ,
ReorderBufferChange  
)

Definition at line 426 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::msg, pfree(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferFreeSnap(), ReorderBufferReturnRelids(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, ReorderBufferChange::tp, and ReorderBufferChange::truncate.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferCommit(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), and ReorderBufferToastReset().

427 {
428  /* update memory accounting info */
429  ReorderBufferChangeMemoryUpdate(rb, change, false);
430 
431  /* free contained data */
432  switch (change->action)
433  {
438  if (change->data.tp.newtuple)
439  {
440  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
441  change->data.tp.newtuple = NULL;
442  }
443 
444  if (change->data.tp.oldtuple)
445  {
446  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
447  change->data.tp.oldtuple = NULL;
448  }
449  break;
451  if (change->data.msg.prefix != NULL)
452  pfree(change->data.msg.prefix);
453  change->data.msg.prefix = NULL;
454  if (change->data.msg.message != NULL)
455  pfree(change->data.msg.message);
456  change->data.msg.message = NULL;
457  break;
459  if (change->data.snapshot)
460  {
461  ReorderBufferFreeSnap(rb, change->data.snapshot);
462  change->data.snapshot = NULL;
463  }
464  break;
465  /* no data in addition to the struct itself */
467  if (change->data.truncate.relids != NULL)
468  {
469  ReorderBufferReturnRelids(rb, change->data.truncate.relids);
470  change->data.truncate.relids = NULL;
471  }
472  break;
476  break;
477  }
478 
479  pfree(change);
480 }
void pfree(void *pointer)
Definition: mcxt.c:1056
void ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)

◆ ReorderBufferReturnRelids()

void ReorderBufferReturnRelids ( ReorderBuffer ,
Oid relids 
)

Definition at line 539 of file reorderbuffer.c.

References pfree().

Referenced by ReorderBufferReturnChange().

540 {
541  pfree(relids);
542 }
void pfree(void *pointer)
Definition: mcxt.c:1056

◆ ReorderBufferReturnTupleBuf()

void ReorderBufferReturnTupleBuf ( ReorderBuffer ,
ReorderBufferTupleBuf tuple 
)

Definition at line 508 of file reorderbuffer.c.

References pfree().

Referenced by ReorderBufferReturnChange().

509 {
510  pfree(tuple);
511 }
void pfree(void *pointer)
Definition: mcxt.c:1056

◆ ReorderBufferSetBaseSnapshot()

void ReorderBufferSetBaseSnapshot ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
struct SnapshotData snap 
)

◆ ReorderBufferSetRestartPoint()

void ReorderBufferSetRestartPoint ( ReorderBuffer ,
XLogRecPtr  ptr 
)

Definition at line 818 of file reorderbuffer.c.

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

819 {
820  rb->current_restart_decoding_lsn = ptr;
821 }

◆ ReorderBufferXidHasBaseSnapshot()

bool ReorderBufferXidHasBaseSnapshot ( ReorderBuffer ,
TransactionId  xid 
)

Definition at line 2281 of file reorderbuffer.c.

References ReorderBufferTXN::base_snapshot, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), ReorderBufferTXN::toplevel_xid, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildCommitTxn(), SnapBuildDistributeNewCatalogSnapshot(), and SnapBuildProcessChange().

2282 {
2283  ReorderBufferTXN *txn;
2284 
2285  txn = ReorderBufferTXNByXid(rb, xid, false,
2286  NULL, InvalidXLogRecPtr, false);
2287 
2288  /* transaction isn't known yet, ergo no snapshot */
2289  if (txn == NULL)
2290  return false;
2291 
2292  /* a known subtxn? operate on top-level txn instead */
2293  if (rbtxn_is_known_subxact(txn))
2294  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
2295  NULL, InvalidXLogRecPtr, false);
2296 
2297  return txn->base_snapshot != NULL;
2298 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_known_subxact(txn)
TransactionId toplevel_xid

◆ ReorderBufferXidHasCatalogChanges()

bool ReorderBufferXidHasCatalogChanges ( ReorderBuffer ,
TransactionId  xid 
)

Definition at line 2264 of file reorderbuffer.c.

References InvalidXLogRecPtr, rbtxn_has_catalog_changes, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildCommitTxn().

2265 {
2266  ReorderBufferTXN *txn;
2267 
2268  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2269  false);
2270  if (txn == NULL)
2271  return false;
2272 
2273  return rbtxn_has_catalog_changes(txn);
2274 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define rbtxn_has_catalog_changes(txn)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferXidSetCatalogChanges()

void ReorderBufferXidSetCatalogChanges ( ReorderBuffer ,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2249 of file reorderbuffer.c.

References RBTXN_HAS_CATALOG_CHANGES, ReorderBufferTXNByXid(), ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::txn_flags.

Referenced by DecodeCommit(), DecodeHeapOp(), and SnapBuildProcessNewCid().

2251 {
2252  ReorderBufferTXN *txn;
2253 
2254  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2255 
2257 }
#define RBTXN_HAS_CATALOG_CHANGES
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ StartupReorderBuffer()

void StartupReorderBuffer ( void  )

Definition at line 3153 of file reorderbuffer.c.

References AllocateDir(), dirent::d_name, DEBUG2, FreeDir(), ReadDir(), ReorderBufferCleanupSerializedTXNs(), and ReplicationSlotValidateName().

Referenced by StartupXLOG().

3154 {
3155  DIR *logical_dir;
3156  struct dirent *logical_de;
3157 
3158  logical_dir = AllocateDir("pg_replslot");
3159  while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
3160  {
3161  if (strcmp(logical_de->d_name, ".") == 0 ||
3162  strcmp(logical_de->d_name, "..") == 0)
3163  continue;
3164 
3165  /* if it cannot be a slot, skip the directory */
3166  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
3167  continue;
3168 
3169  /*
3170  * ok, has to be a surviving logical slot, iterate and delete
3171  * everything starting with xid-*
3172  */
3174  }
3175  FreeDir(logical_dir);
3176 }
Definition: dirent.h:9
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:172
Definition: dirent.c:25
#define DEBUG2
Definition: elog.h:24
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2581
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2647
char d_name[MAX_PATH]
Definition: dirent.h:14
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
int FreeDir(DIR *dir)
Definition: fd.c:2699

Variable Documentation

◆ logical_decoding_work_mem

PGDLLIMPORT int logical_decoding_work_mem

Definition at line 193 of file reorderbuffer.c.

Referenced by ReorderBufferCheckMemoryLimit().