PostgreSQL Source Code  git master
reorderbuffer.h File Reference
#include "access/htup_details.h"
#include "lib/ilist.h"
#include "storage/sinval.h"
#include "utils/hsearch.h"
#include "utils/relcache.h"
#include "utils/snapshot.h"
#include "utils/timestamp.h"
Include dependency graph for reorderbuffer.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTupleBuf
 
struct  ReorderBufferChange
 
struct  ReorderBufferTXN
 
struct  ReorderBuffer
 

Macros

#define ReorderBufferTupleBufData(p)   ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))
 

Typedefs

typedef struct ReorderBufferTupleBuf ReorderBufferTupleBuf
 
typedef struct ReorderBufferChange ReorderBufferChange
 
typedef struct ReorderBufferTXN ReorderBufferTXN
 
typedef struct ReorderBuffer ReorderBuffer
 
typedef void(* ReorderBufferApplyChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
typedef void(* ReorderBufferApplyTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
typedef void(* ReorderBufferBeginCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
typedef void(* ReorderBufferCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
typedef void(* ReorderBufferMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 

Enumerations

enum  ReorderBufferChangeType {
  REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_MESSAGE,
  REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
  REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_TRUNCATE
}
 

Functions

ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *)
 
ReorderBufferTupleBufReorderBufferGetTupleBuf (ReorderBuffer *, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (ReorderBuffer *, ReorderBufferTupleBuf *tuple)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *)
 
void ReorderBufferReturnChange (ReorderBuffer *, ReorderBufferChange *)
 
void ReorderBufferQueueChange (ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *)
 
void ReorderBufferQueueMessage (ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
void ReorderBufferCommit (ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferAssignChild (ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
void ReorderBufferAbort (ReorderBuffer *, TransactionId, XLogRecPtr lsn)
 
void ReorderBufferAbortOld (ReorderBuffer *, TransactionId xid)
 
void ReorderBufferForget (ReorderBuffer *, TransactionId, XLogRecPtr lsn)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap)
 
void ReorderBufferAddSnapshot (ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *, TransactionId, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *, TransactionId, XLogRecPtr lsn, RelFileNode node, ItemPointerData pt, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *, TransactionId, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *, TransactionId xid, XLogRecPtr lsn)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *, TransactionId xid)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *, XLogRecPtr ptr)
 
void StartupReorderBuffer (void)
 

Macro Definition Documentation

◆ ReorderBufferTupleBufData

#define ReorderBufferTupleBufData (   p)    ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))

Typedef Documentation

◆ ReorderBuffer

Definition at line 290 of file reorderbuffer.h.

◆ ReorderBufferApplyChangeCB

typedef void(* ReorderBufferApplyChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)

Definition at line 293 of file reorderbuffer.h.

◆ ReorderBufferApplyTruncateCB

typedef void(* ReorderBufferApplyTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)

Definition at line 300 of file reorderbuffer.h.

◆ ReorderBufferBeginCB

typedef void(* ReorderBufferBeginCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)

Definition at line 308 of file reorderbuffer.h.

◆ ReorderBufferChange

◆ ReorderBufferCommitCB

typedef void(* ReorderBufferCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

Definition at line 313 of file reorderbuffer.h.

◆ ReorderBufferMessageCB

typedef void(* ReorderBufferMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)

Definition at line 319 of file reorderbuffer.h.

◆ ReorderBufferTupleBuf

◆ ReorderBufferTXN

Enumeration Type Documentation

◆ ReorderBufferChangeType

Enumerator
REORDER_BUFFER_CHANGE_INSERT 
REORDER_BUFFER_CHANGE_UPDATE 
REORDER_BUFFER_CHANGE_DELETE 
REORDER_BUFFER_CHANGE_MESSAGE 
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT 
REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID 
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM 
REORDER_BUFFER_CHANGE_TRUNCATE 

Definition at line 52 of file reorderbuffer.h.

Function Documentation

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn 
)

Definition at line 1692 of file reorderbuffer.c.

References ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferCleanupTXN(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeAbort().

1693 {
1694  ReorderBufferTXN *txn;
1695 
1696  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1697  false);
1698 
1699  /* unknown, nothing to remove */
1700  if (txn == NULL)
1701  return;
1702 
1703  /* cosmetic... */
1704  txn->final_lsn = lsn;
1705 
1706  /* remove potential on-disk data, and deallocate */
1707  ReorderBufferCleanupTXN(rb, txn);
1708 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer ,
TransactionId  xid 
)

Definition at line 1718 of file reorderbuffer.c.

References ReorderBufferTXN::changes, dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, dlist_tail_element, elog, ReorderBufferTXN::final_lsn, ReorderBufferChange::lsn, ReorderBufferCleanupTXN(), ReorderBufferTXN::serialized, ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::xid.

Referenced by DecodeStandbyOp().

1719 {
1720  dlist_mutable_iter it;
1721 
1722  /*
1723  * Iterate through all (potential) toplevel TXNs and abort all that are
1724  * older than what possibly can be running. Once we've found the first
1725  * that is alive we stop, there might be some that acquired an xid earlier
1726  * but started writing later, but it's unlikely and they will be cleaned
1727  * up in a later call to this function.
1728  */
1729  dlist_foreach_modify(it, &rb->toplevel_by_lsn)
1730  {
1731  ReorderBufferTXN *txn;
1732 
1733  txn = dlist_container(ReorderBufferTXN, node, it.cur);
1734 
1735  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1736  {
1737  /*
1738  * We set final_lsn on a transaction when we decode its commit or
1739  * abort record, but we never see those records for crashed
1740  * transactions. To ensure cleanup of these transactions, set
1741  * final_lsn to that of their last change; this causes
1742  * ReorderBufferRestoreCleanup to do the right thing.
1743  */
1744  if (txn->serialized && txn->final_lsn == 0)
1745  {
1746  ReorderBufferChange *last =
1748 
1749  txn->final_lsn = last->lsn;
1750  }
1751 
1752  elog(DEBUG2, "aborting old transaction %u", txn->xid);
1753 
1754  /* remove potential on-disk data, and deallocate this tx */
1755  ReorderBufferCleanupTXN(rb, txn);
1756  }
1757  else
1758  return;
1759  }
1760 }
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:496
dlist_head changes
#define DEBUG2
Definition: elog.h:24
XLogRecPtr final_lsn
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
#define elog
Definition: elog.h:219

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 1945 of file reorderbuffer.c.

References Assert, ReorderBuffer::context, elog, ERROR, ReorderBufferTXN::invalidations, MemoryContextAlloc(), ReorderBufferTXN::ninvalidations, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

1948 {
1949  ReorderBufferTXN *txn;
1950 
1951  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1952 
1953  if (txn->ninvalidations != 0)
1954  elog(ERROR, "only ever add one set of invalidations");
1955 
1956  Assert(nmsgs > 0);
1957 
1958  txn->ninvalidations = nmsgs;
1960  MemoryContextAlloc(rb->context,
1961  sizeof(SharedInvalidationMessage) * nmsgs);
1962  memcpy(txn->invalidations, msgs,
1963  sizeof(SharedInvalidationMessage) * nmsgs);
1964 }
#define ERROR
Definition: elog.h:43
#define Assert(condition)
Definition: c.h:699
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:771
#define elog
Definition: elog.h:219

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
CommandId  cid 
)

Definition at line 1901 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::command_id, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, ReorderBufferGetChange(), and ReorderBufferQueueChange().

Referenced by SnapBuildProcessNewCid().

1903 {
1905 
1906  change->data.command_id = cid;
1908 
1909  ReorderBufferQueueChange(rb, xid, lsn, change);
1910 }
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:78
union ReorderBufferChange::@102 data
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
RelFileNode  node,
ItemPointerData  pt,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 1917 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessNewCid().

1921 {
1923  ReorderBufferTXN *txn;
1924 
1925  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1926 
1927  change->data.tuplecid.node = node;
1928  change->data.tuplecid.tid = tid;
1929  change->data.tuplecid.cmin = cmin;
1930  change->data.tuplecid.cmax = cmax;
1931  change->data.tuplecid.combocid = combocid;
1932  change->lsn = lsn;
1934 
1935  dlist_push_tail(&txn->tuplecids, &change->node);
1936  txn->ntuplecids++;
1937 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:78
union ReorderBufferChange::@102 data
struct ReorderBufferChange::@102::@106 tuplecid
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
dlist_head tuplecids

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
struct SnapshotData snap 
)

◆ ReorderBufferAllocate()

ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 224 of file reorderbuffer.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, buffer, ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, ReorderBufferCleanupSerializedTXNs(), SLAB_DEFAULT_BLOCK_SIZE, SLAB_LARGE_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::toplevel_by_lsn, ReorderBuffer::tup_context, and ReorderBuffer::txn_context.

Referenced by StartupDecodingContext().

225 {
227  HASHCTL hash_ctl;
228  MemoryContext new_ctx;
229 
230  Assert(MyReplicationSlot != NULL);
231 
232  /* allocate memory in own context, to have better accountability */
234  "ReorderBuffer",
236 
237  buffer =
238  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
239 
240  memset(&hash_ctl, 0, sizeof(hash_ctl));
241 
242  buffer->context = new_ctx;
243 
244  buffer->change_context = SlabContextCreate(new_ctx,
245  "Change",
247  sizeof(ReorderBufferChange));
248 
249  buffer->txn_context = SlabContextCreate(new_ctx,
250  "TXN",
252  sizeof(ReorderBufferTXN));
253 
254  buffer->tup_context = GenerationContextCreate(new_ctx,
255  "Tuples",
257 
258  hash_ctl.keysize = sizeof(TransactionId);
259  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
260  hash_ctl.hcxt = buffer->context;
261 
262  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
264 
266  buffer->by_txn_last_txn = NULL;
267 
268  buffer->outbuf = NULL;
269  buffer->outbufsize = 0;
270 
272 
273  dlist_init(&buffer->toplevel_by_lsn);
274 
275  /*
276  * Ensure there's no stale data from prior uses of this slot, in case some
277  * prior exit avoided calling ReorderBufferFree. Failure to do this can
278  * produce duplicated txns, and it's very cheap if there's nothing there.
279  */
281 
282  return buffer;
283 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
uint32 TransactionId
Definition: c.h:474
MemoryContext hcxt
Definition: hsearch.h:78
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
Size entrysize
Definition: hsearch.h:73
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:188
ReplicationSlotPersistentData data
Definition: slot.h:120
MemoryContext change_context
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:222
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
#define InvalidTransactionId
Definition: transam.h:31
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define AllocSetContextCreate(parent, name, allocparams)
Definition: memutils.h:170
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size blockSize)
Definition: generation.c:212
MemoryContext context
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
ReorderBufferTXN * by_txn_last_txn
dlist_head toplevel_by_lsn
Size keysize
Definition: hsearch.h:72
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:699
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:215
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:221
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:771
#define NameStr(name)
Definition: c.h:576
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
MemoryContext tup_context
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext txn_context

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer ,
TransactionId  ,
TransactionId  ,
XLogRecPtr  commit_lsn 
)

Definition at line 670 of file reorderbuffer.c.

References Assert, dlist_delete(), dlist_push_tail(), elog, ERROR, ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeXactOp().

672 {
673  ReorderBufferTXN *txn;
674  ReorderBufferTXN *subtxn;
675  bool new_top;
676  bool new_sub;
677 
678  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
679  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
680 
681  if (new_sub)
682  {
683  /*
684  * we assign subtransactions to top level transaction even if we don't
685  * have data for it yet, assignment records frequently reference xids
686  * that have not yet produced any records. Knowing those aren't top
687  * level xids allows us to make processing cheaper in some places.
688  */
689  dlist_push_tail(&txn->subtxns, &subtxn->node);
690  txn->nsubtxns++;
691  }
692  else if (!subtxn->is_known_as_subxact)
693  {
694  subtxn->is_known_as_subxact = true;
695  Assert(subtxn->nsubtxns == 0);
696 
697  /* remove from lsn order list of top-level transactions */
698  dlist_delete(&subtxn->node);
699 
700  /* add to toplevel transaction */
701  dlist_push_tail(&txn->subtxns, &subtxn->node);
702  txn->nsubtxns++;
703  }
704  else if (new_top)
705  {
706  elog(ERROR, "existing subxact assigned to unknown toplevel xact");
707  }
708 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
#define ERROR
Definition: elog.h:43
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define Assert(condition)
Definition: c.h:699
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define elog
Definition: elog.h:219

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 1282 of file reorderbuffer.c.

References AbortCurrentTransaction(), ReorderBufferChange::action, ReorderBuffer::apply_change, ReorderBuffer::apply_truncate, Assert, ReorderBufferTXN::base_snapshot, ReorderBuffer::begin, BeginInternalSubTransaction(), ReorderBufferChange::command_id, ReorderBuffer::commit, ReorderBufferTXN::commit_time, SnapshotData::copied, SnapshotData::curcid, ReorderBufferChange::data, dlist_delete(), elog, ReorderBufferTXN::end_lsn, ERROR, ReorderBufferTXN::final_lsn, FirstCommandId, GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), i, InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, ReorderBuffer::message, ReorderBufferChange::msg, ReorderBufferTXN::ninvalidations, ReorderBufferChange::node, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBuffer::output_rewrites, palloc0(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelidByRelfilenode(), relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferReturnChange(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTXNByXid(), RollbackAndReleaseCurrentSubTransaction(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, StartTransactionCommand(), TeardownHistoricSnapshot(), ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

1286 {
1287  ReorderBufferTXN *txn;
1288  volatile Snapshot snapshot_now;
1289  volatile CommandId command_id = FirstCommandId;
1290  bool using_subtxn;
1291  ReorderBufferIterTXNState *volatile iterstate = NULL;
1292 
1293  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1294  false);
1295 
1296  /* unknown transaction, nothing to replay */
1297  if (txn == NULL)
1298  return;
1299 
1300  txn->final_lsn = commit_lsn;
1301  txn->end_lsn = end_lsn;
1302  txn->commit_time = commit_time;
1303  txn->origin_id = origin_id;
1304  txn->origin_lsn = origin_lsn;
1305 
1306  /*
1307  * If this transaction didn't have any real changes in our database, it's
1308  * OK not to have a snapshot. Note that ReorderBufferCommitChild will have
1309  * transferred its snapshot to this transaction if it had one and the
1310  * toplevel tx didn't.
1311  */
1312  if (txn->base_snapshot == NULL)
1313  {
1314  Assert(txn->ninvalidations == 0);
1315  ReorderBufferCleanupTXN(rb, txn);
1316  return;
1317  }
1318 
1319  snapshot_now = txn->base_snapshot;
1320 
1321  /* build data to be able to lookup the CommandIds of catalog tuples */
1323 
1324  /* setup the initial snapshot */
1325  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1326 
1327  /*
1328  * Decoding needs access to syscaches et al., which in turn use
1329  * heavyweight locks and such. Thus we need to have enough state around to
1330  * keep track of those. The easiest way is to simply use a transaction
1331  * internally. That also allows us to easily enforce that nothing writes
1332  * to the database by checking for xid assignments.
1333  *
1334  * When we're called via the SQL SRF there's already a transaction
1335  * started, so start an explicit subtransaction there.
1336  */
1337  using_subtxn = IsTransactionOrTransactionBlock();
1338 
1339  PG_TRY();
1340  {
1341  ReorderBufferChange *change;
1342  ReorderBufferChange *specinsert = NULL;
1343 
1344  if (using_subtxn)
1345  BeginInternalSubTransaction("replay");
1346  else
1348 
1349  rb->begin(rb, txn);
1350 
1351  iterstate = ReorderBufferIterTXNInit(rb, txn);
1352  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1353  {
1354  Relation relation = NULL;
1355  Oid reloid;
1356 
1357  switch (change->action)
1358  {
1360 
1361  /*
1362  * Confirmation for speculative insertion arrived. Simply
1363  * use as a normal record. It'll be cleaned up at the end
1364  * of INSERT processing.
1365  */
1366  Assert(specinsert->data.tp.oldtuple == NULL);
1367  change = specinsert;
1369 
1370  /* intentionally fall through */
1374  Assert(snapshot_now);
1375 
1376  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1377  change->data.tp.relnode.relNode);
1378 
1379  /*
1380  * Catalog tuple without data, emitted while catalog was
1381  * in the process of being rewritten.
1382  */
1383  if (reloid == InvalidOid &&
1384  change->data.tp.newtuple == NULL &&
1385  change->data.tp.oldtuple == NULL)
1386  goto change_done;
1387  else if (reloid == InvalidOid)
1388  elog(ERROR, "could not map filenode \"%s\" to relation OID",
1389  relpathperm(change->data.tp.relnode,
1390  MAIN_FORKNUM));
1391 
1392  relation = RelationIdGetRelation(reloid);
1393 
1394  if (relation == NULL)
1395  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1396  reloid,
1397  relpathperm(change->data.tp.relnode,
1398  MAIN_FORKNUM));
1399 
1400  if (!RelationIsLogicallyLogged(relation))
1401  goto change_done;
1402 
1403  /*
1404  * Ignore temporary heaps created during DDL unless the
1405  * plugin has asked for them.
1406  */
1407  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
1408  goto change_done;
1409 
1410  /*
1411  * For now ignore sequence changes entirely. Most of the
1412  * time they don't log changes using records we
1413  * understand, so it doesn't make sense to handle the few
1414  * cases we do.
1415  */
1416  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1417  goto change_done;
1418 
1419  /* user-triggered change */
1420  if (!IsToastRelation(relation))
1421  {
1422  ReorderBufferToastReplace(rb, txn, relation, change);
1423  rb->apply_change(rb, txn, relation, change);
1424 
1425  /*
1426  * Only clear reassembled toast chunks if we're sure
1427  * they're not required anymore. The creator of the
1428  * tuple tells us.
1429  */
1430  if (change->data.tp.clear_toast_afterwards)
1431  ReorderBufferToastReset(rb, txn);
1432  }
1433  /* we're not interested in toast deletions */
1434  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1435  {
1436  /*
1437  * Need to reassemble the full toasted Datum in
1438  * memory, to ensure the chunks don't get reused till
1439  * we're done remove it from the list of this
1440  * transaction's changes. Otherwise it will get
1441  * freed/reused while restoring spooled data from
1442  * disk.
1443  */
1444  dlist_delete(&change->node);
1445  ReorderBufferToastAppendChunk(rb, txn, relation,
1446  change);
1447  }
1448 
1449  change_done:
1450 
1451  /*
1452  * Either speculative insertion was confirmed, or it was
1453  * unsuccessful and the record isn't needed anymore.
1454  */
1455  if (specinsert != NULL)
1456  {
1457  ReorderBufferReturnChange(rb, specinsert);
1458  specinsert = NULL;
1459  }
1460 
1461  if (relation != NULL)
1462  {
1463  RelationClose(relation);
1464  relation = NULL;
1465  }
1466  break;
1467 
1469 
1470  /*
1471  * Speculative insertions are dealt with by delaying the
1472  * processing of the insert until the confirmation record
1473  * arrives. For that we simply unlink the record from the
1474  * chain, so it does not get freed/reused while restoring
1475  * spooled data from disk.
1476  *
1477  * This is safe in the face of concurrent catalog changes
1478  * because the relevant relation can't be changed between
1479  * speculative insertion and confirmation due to
1480  * CheckTableNotInUse() and locking.
1481  */
1482 
1483  /* clear out a pending (and thus failed) speculation */
1484  if (specinsert != NULL)
1485  {
1486  ReorderBufferReturnChange(rb, specinsert);
1487  specinsert = NULL;
1488  }
1489 
1490  /* and memorize the pending insertion */
1491  dlist_delete(&change->node);
1492  specinsert = change;
1493  break;
1494 
1496  {
1497  int i;
1498  int nrelids = change->data.truncate.nrelids;
1499  int nrelations = 0;
1500  Relation *relations;
1501 
1502  relations = palloc0(nrelids * sizeof(Relation));
1503  for (i = 0; i < nrelids; i++)
1504  {
1505  Oid relid = change->data.truncate.relids[i];
1506  Relation relation;
1507 
1508  relation = RelationIdGetRelation(relid);
1509 
1510  if (relation == NULL)
1511  elog(ERROR, "could not open relation with OID %u", relid);
1512 
1513  if (!RelationIsLogicallyLogged(relation))
1514  continue;
1515 
1516  relations[nrelations++] = relation;
1517  }
1518 
1519  rb->apply_truncate(rb, txn, nrelations, relations, change);
1520 
1521  for (i = 0; i < nrelations; i++)
1522  RelationClose(relations[i]);
1523 
1524  break;
1525  }
1526 
1528  rb->message(rb, txn, change->lsn, true,
1529  change->data.msg.prefix,
1530  change->data.msg.message_size,
1531  change->data.msg.message);
1532  break;
1533 
1535  /* get rid of the old */
1536  TeardownHistoricSnapshot(false);
1537 
1538  if (snapshot_now->copied)
1539  {
1540  ReorderBufferFreeSnap(rb, snapshot_now);
1541  snapshot_now =
1542  ReorderBufferCopySnap(rb, change->data.snapshot,
1543  txn, command_id);
1544  }
1545 
1546  /*
1547  * Restored from disk, need to be careful not to double
1548  * free. We could introduce refcounting for that, but for
1549  * now this seems infrequent enough not to care.
1550  */
1551  else if (change->data.snapshot->copied)
1552  {
1553  snapshot_now =
1554  ReorderBufferCopySnap(rb, change->data.snapshot,
1555  txn, command_id);
1556  }
1557  else
1558  {
1559  snapshot_now = change->data.snapshot;
1560  }
1561 
1562 
1563  /* and continue with the new one */
1564  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1565  break;
1566 
1568  Assert(change->data.command_id != InvalidCommandId);
1569 
1570  if (command_id < change->data.command_id)
1571  {
1572  command_id = change->data.command_id;
1573 
1574  if (!snapshot_now->copied)
1575  {
1576  /* we don't use the global one anymore */
1577  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1578  txn, command_id);
1579  }
1580 
1581  snapshot_now->curcid = command_id;
1582 
1583  TeardownHistoricSnapshot(false);
1584  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1585 
1586  /*
1587  * Every time the CommandId is incremented, we could
1588  * see new catalog contents, so execute all
1589  * invalidations.
1590  */
1592  }
1593 
1594  break;
1595 
1597  elog(ERROR, "tuplecid value in changequeue");
1598  break;
1599  }
1600  }
1601 
1602  /*
1603  * There's a speculative insertion remaining, just clean in up, it
1604  * can't have been successful, otherwise we'd gotten a confirmation
1605  * record.
1606  */
1607  if (specinsert)
1608  {
1609  ReorderBufferReturnChange(rb, specinsert);
1610  specinsert = NULL;
1611  }
1612 
1613  /* clean up the iterator */
1614  ReorderBufferIterTXNFinish(rb, iterstate);
1615  iterstate = NULL;
1616 
1617  /* call commit callback */
1618  rb->commit(rb, txn, commit_lsn);
1619 
1620  /* this is just a sanity check against bad output plugin behaviour */
1622  elog(ERROR, "output plugin used XID %u",
1624 
1625  /* cleanup */
1626  TeardownHistoricSnapshot(false);
1627 
1628  /*
1629  * Aborting the current (sub-)transaction as a whole has the right
1630  * semantics. We want all locks acquired in here to be released, not
1631  * reassigned to the parent and we do not want any database access
1632  * have persistent effects.
1633  */
1635 
1636  /* make sure there's no cache pollution */
1638 
1639  if (using_subtxn)
1641 
1642  if (snapshot_now->copied)
1643  ReorderBufferFreeSnap(rb, snapshot_now);
1644 
1645  /* remove potential on-disk data, and deallocate */
1646  ReorderBufferCleanupTXN(rb, txn);
1647  }
1648  PG_CATCH();
1649  {
1650  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1651  if (iterstate)
1652  ReorderBufferIterTXNFinish(rb, iterstate);
1653 
1655 
1656  /*
1657  * Force cache invalidation to happen outside of a valid transaction
1658  * to prevent catalog access as we just caught an error.
1659  */
1661 
1662  /* make sure there's no cache pollution */
1664 
1665  if (using_subtxn)
1667 
1668  if (snapshot_now->copied)
1669  ReorderBufferFreeSnap(rb, snapshot_now);
1670 
1671  /* remove potential on-disk data, and deallocate */
1672  ReorderBufferCleanupTXN(rb, txn);
1673 
1674  PG_RE_THROW();
1675  }
1676  PG_END_TRY();
1677 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
struct ReorderBufferChange::@102::@104 truncate
uint32 CommandId
Definition: c.h:488
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
TimestampTz commit_time
void AbortCurrentTransaction(void)
Definition: xact.c:2984
bool IsToastRelation(Relation relation)
Definition: catalog.c:136
#define relpathperm(rnode, forknum)
Definition: relpath.h:83
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
bool copied
Definition: snapshot.h:96
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4440
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2017
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:580
Form_pg_class rd_rel
Definition: rel.h:84
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:78
XLogRecPtr origin_lsn
#define FirstCommandId
Definition: c.h:490
struct ReorderBufferChange::@102::@103 tp
#define ERROR
Definition: elog.h:43
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:417
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4251
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:434
#define InvalidTransactionId
Definition: transam.h:31
XLogRecPtr final_lsn
void RelationClose(Relation relation)
Definition: relcache.c:1996
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
void * palloc0(Size size)
Definition: mcxt.c:955
#define InvalidCommandId
Definition: c.h:491
union ReorderBufferChange::@102 data
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
#define InvalidOid
Definition: postgres_ext.h:36
CommandId curcid
Definition: snapshot.h:98
struct ReorderBufferChange::@102::@105 msg
static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define PG_CATCH()
Definition: elog.h:293
#define Assert(condition)
Definition: c.h:699
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
XLogRecPtr end_lsn
void StartTransactionCommand(void)
Definition: xact.c:2673
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4146
#define PG_RE_THROW()
Definition: elog.h:314
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2001
int i
#define elog
Definition: elog.h:219
#define PG_TRY()
Definition: elog.h:284
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
static ReorderBufferIterTXNState * ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:1906
#define PG_END_TRY()
Definition: elog.h:300

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer ,
TransactionId  ,
TransactionId  ,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 715 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_delete(), dlist_push_tail(), elog, ReorderBufferTXN::end_lsn, ERROR, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

718 {
719  ReorderBufferTXN *txn;
720  ReorderBufferTXN *subtxn;
721 
722  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
723  InvalidXLogRecPtr, false);
724 
725  /*
726  * No need to do anything if that subtxn didn't contain any changes
727  */
728  if (!subtxn)
729  return;
730 
731  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true);
732 
733  if (txn == NULL)
734  elog(ERROR, "subxact logged without previous toplevel record");
735 
736  /*
737  * Pass our base snapshot to the parent transaction if it doesn't have
738  * one, or ours is older. That can happen if there are no changes in the
739  * toplevel transaction but in one of the child transactions. This allows
740  * the parent to simply use its base snapshot initially.
741  */
742  if (subtxn->base_snapshot != NULL &&
743  (txn->base_snapshot == NULL ||
744  txn->base_snapshot_lsn > subtxn->base_snapshot_lsn))
745  {
746  txn->base_snapshot = subtxn->base_snapshot;
747  txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
748  subtxn->base_snapshot = NULL;
750  }
751 
752  subtxn->final_lsn = commit_lsn;
753  subtxn->end_lsn = end_lsn;
754 
755  if (!subtxn->is_known_as_subxact)
756  {
757  subtxn->is_known_as_subxact = true;
758  Assert(subtxn->nsubtxns == 0);
759 
760  /* remove from lsn order list of top-level transactions */
761  dlist_delete(&subtxn->node);
762 
763  /* add to subtransaction list */
764  dlist_push_tail(&txn->subtxns, &subtxn->node);
765  txn->nsubtxns++;
766  }
767 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
XLogRecPtr base_snapshot_lsn
#define ERROR
Definition: elog.h:43
XLogRecPtr final_lsn
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define Assert(condition)
Definition: c.h:699
XLogRecPtr end_lsn
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define elog
Definition: elog.h:219

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn 
)

Definition at line 1776 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

1777 {
1778  ReorderBufferTXN *txn;
1779 
1780  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1781  false);
1782 
1783  /* unknown, nothing to forget */
1784  if (txn == NULL)
1785  return;
1786 
1787  /* cosmetic... */
1788  txn->final_lsn = lsn;
1789 
1790  /*
1791  * Process cache invalidation messages if there are any. Even if we're not
1792  * interested in the transaction's contents, it could have manipulated the
1793  * catalog and we need to update the caches according to that.
1794  */
1795  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
1797  txn->invalidations);
1798  else
1799  Assert(txn->ninvalidations == 0);
1800 
1801  /* remove potential on-disk data, and deallocate */
1802  ReorderBufferCleanupTXN(rb, txn);
1803 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:699
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer )

Definition at line 289 of file reorderbuffer.c.

References ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

290 {
291  MemoryContext context = rb->context;
292 
293  /*
294  * We free separately allocated data by entirely scrapping reorderbuffer's
295  * memory context.
296  */
297  MemoryContextDelete(context);
298 
299  /* Free disk space used by unconsumed reorder buffers */
301 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
ReplicationSlotPersistentData data
Definition: slot.h:120
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NameStr(name)
Definition: c.h:576
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)

◆ ReorderBufferGetChange()

ReorderBufferChange* ReorderBufferGetChange ( ReorderBuffer )

Definition at line 360 of file reorderbuffer.c.

References ReorderBuffer::change_context, and MemoryContextAlloc().

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddNewTupleCids(), ReorderBufferAddSnapshot(), ReorderBufferQueueMessage(), and ReorderBufferRestoreChange().

361 {
362  ReorderBufferChange *change;
363 
364  change = (ReorderBufferChange *)
365  MemoryContextAlloc(rb->change_context, sizeof(ReorderBufferChange));
366 
367  memset(change, 0, sizeof(ReorderBufferChange));
368  return change;
369 }
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:771

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer )

Definition at line 647 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessRunningXacts().

648 {
649  ReorderBufferTXN *txn;
650 
651  if (dlist_is_empty(&rb->toplevel_by_lsn))
652  return NULL;
653 
654  AssertTXNLsnOrder(rb);
655 
656  txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
657 
660  return txn;
661 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:699
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289

◆ ReorderBufferGetTupleBuf()

ReorderBufferTupleBuf* ReorderBufferGetTupleBuf ( ReorderBuffer ,
Size  tuple_len 
)

Definition at line 430 of file reorderbuffer.c.

References ReorderBufferTupleBuf::alloc_tuple_size, MemoryContextAlloc(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, HeapTupleData::t_data, ReorderBuffer::tup_context, and ReorderBufferTupleBuf::tuple.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

431 {
432  ReorderBufferTupleBuf *tuple;
433  Size alloc_len;
434 
435  alloc_len = tuple_len + SizeofHeapTupleHeader;
436 
437  tuple = (ReorderBufferTupleBuf *)
438  MemoryContextAlloc(rb->tup_context,
439  sizeof(ReorderBufferTupleBuf) +
440  MAXIMUM_ALIGNOF + alloc_len);
441  tuple->alloc_tuple_size = alloc_len;
442  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
443 
444  return tuple;
445 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:181
HeapTupleHeader t_data
Definition: htup.h:68
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:36
HeapTupleData tuple
Definition: reorderbuffer.h:27
size_t Size
Definition: c.h:433
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:771

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer ,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 1812 of file reorderbuffer.c.

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by DecodeStandbyOp(), and ReorderBufferForget().

1814 {
1815  bool use_subtxn = IsTransactionOrTransactionBlock();
1816  int i;
1817 
1818  if (use_subtxn)
1819  BeginInternalSubTransaction("replay");
1820 
1821  /*
1822  * Force invalidations to happen outside of a valid transaction - that way
1823  * entries will just be marked as invalid without accessing the catalog.
1824  * That's advantageous because we don't need to setup the full state
1825  * necessary for catalog access.
1826  */
1827  if (use_subtxn)
1829 
1830  for (i = 0; i < ninvalidations; i++)
1831  LocalExecuteInvalidationMessage(&invalidations[i]);
1832 
1833  if (use_subtxn)
1835 }
void AbortCurrentTransaction(void)
Definition: xact.c:2984
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4440
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4251
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4146
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:554
int i

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer ,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1848 of file reorderbuffer.c.

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by DecodeHeap2Op(), DecodeHeapOp(), DecodeLogicalMsgOp(), DecodeStandbyOp(), DecodeXactOp(), DecodeXLogOp(), and LogicalDecodingProcessRecord().

1849 {
1850  /* many records won't have an xid assigned, centralize check here */
1851  if (xid != InvalidTransactionId)
1852  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1853 }
#define InvalidTransactionId
Definition: transam.h:31
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
ReorderBufferChange  
)

Definition at line 549 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBufferCheckSerializeTXN(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

551 {
552  ReorderBufferTXN *txn;
553 
554  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
555 
556  change->lsn = lsn;
557  Assert(InvalidXLogRecPtr != lsn);
558  dlist_push_tail(&txn->changes, &change->node);
559  txn->nentries++;
560  txn->nentries_mem++;
561 
563 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
dlist_head changes
#define Assert(condition)
Definition: c.h:699
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer ,
TransactionId  ,
Snapshot  snapshot,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 569 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), TeardownHistoricSnapshot(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeLogicalMsgOp().

573 {
574  if (transactional)
575  {
576  MemoryContext oldcontext;
577  ReorderBufferChange *change;
578 
580 
581  oldcontext = MemoryContextSwitchTo(rb->context);
582 
583  change = ReorderBufferGetChange(rb);
585  change->data.msg.prefix = pstrdup(prefix);
586  change->data.msg.message_size = message_size;
587  change->data.msg.message = palloc(message_size);
588  memcpy(change->data.msg.message, message, message_size);
589 
590  ReorderBufferQueueChange(rb, xid, lsn, change);
591 
592  MemoryContextSwitchTo(oldcontext);
593  }
594  else
595  {
596  ReorderBufferTXN *txn = NULL;
597  volatile Snapshot snapshot_now = snapshot;
598 
599  if (xid != InvalidTransactionId)
600  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
601 
602  /* setup snapshot to allow catalog access */
603  SetupHistoricSnapshot(snapshot_now, NULL);
604  PG_TRY();
605  {
606  rb->message(rb, txn, lsn, false, prefix, message_size, message);
607 
609  }
610  PG_CATCH();
611  {
613  PG_RE_THROW();
614  }
615  PG_END_TRY();
616  }
617 }
char * pstrdup(const char *in)
Definition: mcxt.c:1161
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2017
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:78
#define InvalidTransactionId
Definition: transam.h:31
union ReorderBufferChange::@102 data
struct ReorderBufferChange::@102::@105 msg
#define PG_CATCH()
Definition: elog.h:293
#define Assert(condition)
Definition: c.h:699
#define PG_RE_THROW()
Definition: elog.h:314
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:924
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2001
#define PG_TRY()
Definition: elog.h:284
#define PG_END_TRY()
Definition: elog.h:300

◆ ReorderBufferReturnChange()

void ReorderBufferReturnChange ( ReorderBuffer ,
ReorderBufferChange  
)

Definition at line 378 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::msg, pfree(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferFreeSnap(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, and ReorderBufferChange::tp.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferCommit(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), and ReorderBufferToastReset().

379 {
380  /* free contained data */
381  switch (change->action)
382  {
387  if (change->data.tp.newtuple)
388  {
389  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
390  change->data.tp.newtuple = NULL;
391  }
392 
393  if (change->data.tp.oldtuple)
394  {
395  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
396  change->data.tp.oldtuple = NULL;
397  }
398  break;
400  if (change->data.msg.prefix != NULL)
401  pfree(change->data.msg.prefix);
402  change->data.msg.prefix = NULL;
403  if (change->data.msg.message != NULL)
404  pfree(change->data.msg.message);
405  change->data.msg.message = NULL;
406  break;
408  if (change->data.snapshot)
409  {
410  ReorderBufferFreeSnap(rb, change->data.snapshot);
411  change->data.snapshot = NULL;
412  }
413  break;
414  /* no data in addition to the struct itself */
419  break;
420  }
421 
422  pfree(change);
423 }
void pfree(void *pointer)
Definition: mcxt.c:1031
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)

◆ ReorderBufferReturnTupleBuf()

void ReorderBufferReturnTupleBuf ( ReorderBuffer ,
ReorderBufferTupleBuf tuple 
)

Definition at line 454 of file reorderbuffer.c.

References pfree().

Referenced by ReorderBufferReturnChange().

455 {
456  pfree(tuple);
457 }
void pfree(void *pointer)
Definition: mcxt.c:1031

◆ ReorderBufferSetBaseSnapshot()

void ReorderBufferSetBaseSnapshot ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
struct SnapshotData snap 
)

◆ ReorderBufferSetRestartPoint()

void ReorderBufferSetRestartPoint ( ReorderBuffer ,
XLogRecPtr  ptr 
)

Definition at line 664 of file reorderbuffer.c.

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

665 {
666  rb->current_restart_decoding_lsn = ptr;
667 }

◆ ReorderBufferXidHasBaseSnapshot()

bool ReorderBufferXidHasBaseSnapshot ( ReorderBuffer ,
TransactionId  xid 
)

Definition at line 2014 of file reorderbuffer.c.

References ReorderBufferTXN::base_snapshot, InvalidXLogRecPtr, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildCommitTxn(), SnapBuildDistributeNewCatalogSnapshot(), and SnapBuildProcessChange().

2015 {
2016  ReorderBufferTXN *txn;
2017 
2018  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2019  false);
2020 
2021  /* transaction isn't known yet, ergo no snapshot */
2022  if (txn == NULL)
2023  return false;
2024 
2025  /*
2026  * TODO: It would be a nice improvement if we would check the toplevel
2027  * transaction in subtransactions, but we'd need to keep track of a bit
2028  * more state.
2029  */
2030  return txn->base_snapshot != NULL;
2031 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferXidHasCatalogChanges()

bool ReorderBufferXidHasCatalogChanges ( ReorderBuffer ,
TransactionId  xid 
)

Definition at line 1998 of file reorderbuffer.c.

References ReorderBufferTXN::has_catalog_changes, InvalidXLogRecPtr, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildCommitTxn().

1999 {
2000  ReorderBufferTXN *txn;
2001 
2002  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2003  false);
2004  if (txn == NULL)
2005  return false;
2006 
2007  return txn->has_catalog_changes;
2008 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferXidSetCatalogChanges()

void ReorderBufferXidSetCatalogChanges ( ReorderBuffer ,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1983 of file reorderbuffer.c.

References ReorderBufferTXN::has_catalog_changes, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit(), DecodeHeapOp(), and SnapBuildProcessNewCid().

1985 {
1986  ReorderBufferTXN *txn;
1987 
1988  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1989 
1990  txn->has_catalog_changes = true;
1991 }
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ StartupReorderBuffer()

void StartupReorderBuffer ( void  )

Definition at line 2673 of file reorderbuffer.c.

References AllocateDir(), dirent::d_name, DEBUG2, FreeDir(), ReadDir(), ReorderBufferCleanupSerializedTXNs(), and ReplicationSlotValidateName().

Referenced by StartupXLOG().

2674 {
2675  DIR *logical_dir;
2676  struct dirent *logical_de;
2677 
2678  logical_dir = AllocateDir("pg_replslot");
2679  while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
2680  {
2681  if (strcmp(logical_de->d_name, ".") == 0 ||
2682  strcmp(logical_de->d_name, "..") == 0)
2683  continue;
2684 
2685  /* if it cannot be a slot, skip the directory */
2686  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
2687  continue;
2688 
2689  /*
2690  * ok, has to be a surviving logical slot, iterate and delete
2691  * everything starting with xid-*
2692  */
2694  }
2695  FreeDir(logical_dir);
2696 }
Definition: dirent.h:9
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:174
Definition: dirent.c:25
#define DEBUG2
Definition: elog.h:24
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2600
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2666
char d_name[MAX_PATH]
Definition: dirent.h:14
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
int FreeDir(DIR *dir)
Definition: fd.c:2718