PostgreSQL Source Code  git master
reorderbuffer.h File Reference
#include "access/htup_details.h"
#include "lib/ilist.h"
#include "storage/sinval.h"
#include "utils/hsearch.h"
#include "utils/relcache.h"
#include "utils/snapshot.h"
#include "utils/timestamp.h"
Include dependency graph for reorderbuffer.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTupleBuf
 
struct  ReorderBufferChange
 
struct  ReorderBufferTXN
 
struct  ReorderBuffer
 

Macros

#define ReorderBufferTupleBufData(p)   ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))
 

Typedefs

typedef struct ReorderBufferTupleBuf ReorderBufferTupleBuf
 
typedef struct ReorderBufferChange ReorderBufferChange
 
typedef struct ReorderBufferTXN ReorderBufferTXN
 
typedef struct ReorderBuffer ReorderBuffer
 
typedef void(* ReorderBufferApplyChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
typedef void(* ReorderBufferBeginCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
typedef void(* ReorderBufferCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
typedef void(* ReorderBufferMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 

Enumerations

enum  ReorderBufferChangeType {
  REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_MESSAGE,
  REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
  REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
}
 

Functions

ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *)
 
ReorderBufferTupleBufReorderBufferGetTupleBuf (ReorderBuffer *, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (ReorderBuffer *, ReorderBufferTupleBuf *tuple)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *)
 
void ReorderBufferReturnChange (ReorderBuffer *, ReorderBufferChange *)
 
void ReorderBufferQueueChange (ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *)
 
void ReorderBufferQueueMessage (ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
void ReorderBufferCommit (ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferAssignChild (ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
void ReorderBufferAbort (ReorderBuffer *, TransactionId, XLogRecPtr lsn)
 
void ReorderBufferAbortOld (ReorderBuffer *, TransactionId xid)
 
void ReorderBufferForget (ReorderBuffer *, TransactionId, XLogRecPtr lsn)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap)
 
void ReorderBufferAddSnapshot (ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *, TransactionId, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *, TransactionId, XLogRecPtr lsn, RelFileNode node, ItemPointerData pt, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *, TransactionId, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *, TransactionId xid, XLogRecPtr lsn)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *, TransactionId xid)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *, XLogRecPtr ptr)
 
void StartupReorderBuffer (void)
 

Macro Definition Documentation

◆ ReorderBufferTupleBufData

#define ReorderBufferTupleBufData (   p)    ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))

Typedef Documentation

◆ ReorderBuffer

Definition at line 277 of file reorderbuffer.h.

◆ ReorderBufferApplyChangeCB

typedef void(* ReorderBufferApplyChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)

Definition at line 280 of file reorderbuffer.h.

◆ ReorderBufferBeginCB

typedef void(* ReorderBufferBeginCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)

Definition at line 287 of file reorderbuffer.h.

◆ ReorderBufferChange

◆ ReorderBufferCommitCB

typedef void(* ReorderBufferCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

Definition at line 292 of file reorderbuffer.h.

◆ ReorderBufferMessageCB

typedef void(* ReorderBufferMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)

Definition at line 298 of file reorderbuffer.h.

◆ ReorderBufferTupleBuf

◆ ReorderBufferTXN

Enumeration Type Documentation

◆ ReorderBufferChangeType

Enumerator
REORDER_BUFFER_CHANGE_INSERT 
REORDER_BUFFER_CHANGE_UPDATE 
REORDER_BUFFER_CHANGE_DELETE 
REORDER_BUFFER_CHANGE_MESSAGE 
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT 
REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID 
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM 

Definition at line 52 of file reorderbuffer.h.

Function Documentation

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn 
)

Definition at line 1639 of file reorderbuffer.c.

References ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferCleanupTXN(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeAbort().

1640 {
1641  ReorderBufferTXN *txn;
1642 
1643  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1644  false);
1645 
1646  /* unknown, nothing to remove */
1647  if (txn == NULL)
1648  return;
1649 
1650  /* cosmetic... */
1651  txn->final_lsn = lsn;
1652 
1653  /* remove potential on-disk data, and deallocate */
1654  ReorderBufferCleanupTXN(rb, txn);
1655 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer ,
TransactionId  xid 
)

Definition at line 1665 of file reorderbuffer.c.

References ReorderBufferTXN::changes, dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, dlist_tail_element, elog, ReorderBufferTXN::final_lsn, ReorderBufferChange::lsn, ReorderBufferCleanupTXN(), ReorderBufferTXN::serialized, ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::xid.

Referenced by DecodeStandbyOp().

1666 {
1667  dlist_mutable_iter it;
1668 
1669  /*
1670  * Iterate through all (potential) toplevel TXNs and abort all that are
1671  * older than what possibly can be running. Once we've found the first
1672  * that is alive we stop, there might be some that acquired an xid earlier
1673  * but started writing later, but it's unlikely and they will be cleaned
1674  * up in a later call to this function.
1675  */
1676  dlist_foreach_modify(it, &rb->toplevel_by_lsn)
1677  {
1678  ReorderBufferTXN *txn;
1679 
1680  txn = dlist_container(ReorderBufferTXN, node, it.cur);
1681 
1682  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1683  {
1684  /*
1685  * We set final_lsn on a transaction when we decode its commit or
1686  * abort record, but we never see those records for crashed
1687  * transactions. To ensure cleanup of these transactions, set
1688  * final_lsn to that of their last change; this causes
1689  * ReorderBufferRestoreCleanup to do the right thing.
1690  */
1691  if (txn->serialized && txn->final_lsn == 0)
1692  {
1693  ReorderBufferChange *last =
1695 
1696  txn->final_lsn = last->lsn;
1697  }
1698 
1699  elog(DEBUG2, "aborting old transaction %u", txn->xid);
1700 
1701  /* remove potential on-disk data, and deallocate this tx */
1702  ReorderBufferCleanupTXN(rb, txn);
1703  }
1704  else
1705  return;
1706  }
1707 }
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:496
dlist_head changes
#define DEBUG2
Definition: elog.h:24
XLogRecPtr final_lsn
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
#define elog
Definition: elog.h:219

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 1892 of file reorderbuffer.c.

References Assert, ReorderBuffer::context, elog, ERROR, ReorderBufferTXN::invalidations, MemoryContextAlloc(), ReorderBufferTXN::ninvalidations, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

1895 {
1896  ReorderBufferTXN *txn;
1897 
1898  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1899 
1900  if (txn->ninvalidations != 0)
1901  elog(ERROR, "only ever add one set of invalidations");
1902 
1903  Assert(nmsgs > 0);
1904 
1905  txn->ninvalidations = nmsgs;
1907  MemoryContextAlloc(rb->context,
1908  sizeof(SharedInvalidationMessage) * nmsgs);
1909  memcpy(txn->invalidations, msgs,
1910  sizeof(SharedInvalidationMessage) * nmsgs);
1911 }
#define ERROR
Definition: elog.h:43
#define Assert(condition)
Definition: c.h:688
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:693
#define elog
Definition: elog.h:219

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
CommandId  cid 
)

Definition at line 1848 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::command_id, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, ReorderBufferGetChange(), and ReorderBufferQueueChange().

Referenced by SnapBuildProcessNewCid().

1850 {
1852 
1853  change->data.command_id = cid;
1855 
1856  ReorderBufferQueueChange(rb, xid, lsn, change);
1857 }
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
union ReorderBufferChange::@102 data
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
RelFileNode  node,
ItemPointerData  pt,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 1864 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessNewCid().

1868 {
1870  ReorderBufferTXN *txn;
1871 
1872  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1873 
1874  change->data.tuplecid.node = node;
1875  change->data.tuplecid.tid = tid;
1876  change->data.tuplecid.cmin = cmin;
1877  change->data.tuplecid.cmax = cmax;
1878  change->data.tuplecid.combocid = combocid;
1879  change->lsn = lsn;
1881 
1882  dlist_push_tail(&txn->tuplecids, &change->node);
1883  txn->ntuplecids++;
1884 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
union ReorderBufferChange::@102 data
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
struct ReorderBufferChange::@102::@105 tuplecid
dlist_head tuplecids

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
struct SnapshotData snap 
)

◆ ReorderBufferAllocate()

ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 220 of file reorderbuffer.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, buffer, ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), ReorderBuffer::outbuf, ReorderBuffer::outbufsize, SLAB_DEFAULT_BLOCK_SIZE, SLAB_LARGE_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::toplevel_by_lsn, ReorderBuffer::tup_context, and ReorderBuffer::txn_context.

Referenced by StartupDecodingContext().

221 {
223  HASHCTL hash_ctl;
224  MemoryContext new_ctx;
225 
226  /* allocate memory in own context, to have better accountability */
228  "ReorderBuffer",
230 
231  buffer =
232  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
233 
234  memset(&hash_ctl, 0, sizeof(hash_ctl));
235 
236  buffer->context = new_ctx;
237 
238  buffer->change_context = SlabContextCreate(new_ctx,
239  "Change",
240  0,
242  sizeof(ReorderBufferChange));
243 
244  buffer->txn_context = SlabContextCreate(new_ctx,
245  "TXN",
246  0,
248  sizeof(ReorderBufferTXN));
249 
250  buffer->tup_context = GenerationContextCreate(new_ctx,
251  "Tuples",
252  0,
254 
255  hash_ctl.keysize = sizeof(TransactionId);
256  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
257  hash_ctl.hcxt = buffer->context;
258 
259  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
261 
263  buffer->by_txn_last_txn = NULL;
264 
265  buffer->outbuf = NULL;
266  buffer->outbufsize = 0;
267 
269 
270  dlist_init(&buffer->toplevel_by_lsn);
271 
272  return buffer;
273 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
uint32 TransactionId
Definition: c.h:463
MemoryContext hcxt
Definition: hsearch.h:78
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
Size entrysize
Definition: hsearch.h:73
MemoryContext change_context
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, int flags, Size blockSize, Size chunkSize)
Definition: slab.c:190
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, int flags, Size blockSize)
Definition: generation.c:208
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:227
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:197
#define InvalidTransactionId
Definition: transam.h:31
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
#define AllocSetContextCreate(parent, name, allocparams)
Definition: memutils.h:165
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext context
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
ReorderBufferTXN * by_txn_last_txn
dlist_head toplevel_by_lsn
Size keysize
Definition: hsearch.h:72
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:215
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:226
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:693
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
MemoryContext tup_context
MemoryContext txn_context

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer ,
TransactionId  ,
TransactionId  ,
XLogRecPtr  commit_lsn 
)

Definition at line 656 of file reorderbuffer.c.

References Assert, dlist_delete(), dlist_push_tail(), elog, ERROR, ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeXactOp().

658 {
659  ReorderBufferTXN *txn;
660  ReorderBufferTXN *subtxn;
661  bool new_top;
662  bool new_sub;
663 
664  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
665  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
666 
667  if (new_sub)
668  {
669  /*
670  * we assign subtransactions to top level transaction even if we don't
671  * have data for it yet, assignment records frequently reference xids
672  * that have not yet produced any records. Knowing those aren't top
673  * level xids allows us to make processing cheaper in some places.
674  */
675  dlist_push_tail(&txn->subtxns, &subtxn->node);
676  txn->nsubtxns++;
677  }
678  else if (!subtxn->is_known_as_subxact)
679  {
680  subtxn->is_known_as_subxact = true;
681  Assert(subtxn->nsubtxns == 0);
682 
683  /* remove from lsn order list of top-level transactions */
684  dlist_delete(&subtxn->node);
685 
686  /* add to toplevel transaction */
687  dlist_push_tail(&txn->subtxns, &subtxn->node);
688  txn->nsubtxns++;
689  }
690  else if (new_top)
691  {
692  elog(ERROR, "existing subxact assigned to unknown toplevel xact");
693  }
694 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
#define ERROR
Definition: elog.h:43
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define Assert(condition)
Definition: c.h:688
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define elog
Definition: elog.h:219

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 1268 of file reorderbuffer.c.

References AbortCurrentTransaction(), ReorderBufferChange::action, ReorderBuffer::apply_change, Assert, ReorderBufferTXN::base_snapshot, ReorderBuffer::begin, BeginInternalSubTransaction(), ReorderBufferChange::command_id, ReorderBuffer::commit, ReorderBufferTXN::commit_time, SnapshotData::copied, SnapshotData::curcid, ReorderBufferChange::data, dlist_delete(), elog, ReorderBufferTXN::end_lsn, ERROR, ReorderBufferTXN::final_lsn, FirstCommandId, GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, ReorderBuffer::message, ReorderBufferChange::msg, ReorderBufferTXN::ninvalidations, ReorderBufferChange::node, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelidByRelfilenode(), RELKIND_SEQUENCE, relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferReturnChange(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTXNByXid(), RollbackAndReleaseCurrentSubTransaction(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, StartTransactionCommand(), TeardownHistoricSnapshot(), ReorderBufferChange::tp, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

1272 {
1273  ReorderBufferTXN *txn;
1274  volatile Snapshot snapshot_now;
1275  volatile CommandId command_id = FirstCommandId;
1276  bool using_subtxn;
1277  ReorderBufferIterTXNState *volatile iterstate = NULL;
1278 
1279  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1280  false);
1281 
1282  /* unknown transaction, nothing to replay */
1283  if (txn == NULL)
1284  return;
1285 
1286  txn->final_lsn = commit_lsn;
1287  txn->end_lsn = end_lsn;
1288  txn->commit_time = commit_time;
1289  txn->origin_id = origin_id;
1290  txn->origin_lsn = origin_lsn;
1291 
1292  /*
1293  * If this transaction didn't have any real changes in our database, it's
1294  * OK not to have a snapshot. Note that ReorderBufferCommitChild will have
1295  * transferred its snapshot to this transaction if it had one and the
1296  * toplevel tx didn't.
1297  */
1298  if (txn->base_snapshot == NULL)
1299  {
1300  Assert(txn->ninvalidations == 0);
1301  ReorderBufferCleanupTXN(rb, txn);
1302  return;
1303  }
1304 
1305  snapshot_now = txn->base_snapshot;
1306 
1307  /* build data to be able to lookup the CommandIds of catalog tuples */
1309 
1310  /* setup the initial snapshot */
1311  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1312 
1313  /*
1314  * Decoding needs access to syscaches et al., which in turn use
1315  * heavyweight locks and such. Thus we need to have enough state around to
1316  * keep track of those. The easiest way is to simply use a transaction
1317  * internally. That also allows us to easily enforce that nothing writes
1318  * to the database by checking for xid assignments.
1319  *
1320  * When we're called via the SQL SRF there's already a transaction
1321  * started, so start an explicit subtransaction there.
1322  */
1323  using_subtxn = IsTransactionOrTransactionBlock();
1324 
1325  PG_TRY();
1326  {
1327  ReorderBufferChange *change;
1328  ReorderBufferChange *specinsert = NULL;
1329 
1330  if (using_subtxn)
1331  BeginInternalSubTransaction("replay");
1332  else
1334 
1335  rb->begin(rb, txn);
1336 
1337  iterstate = ReorderBufferIterTXNInit(rb, txn);
1338  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1339  {
1340  Relation relation = NULL;
1341  Oid reloid;
1342 
1343  switch (change->action)
1344  {
1346 
1347  /*
1348  * Confirmation for speculative insertion arrived. Simply
1349  * use as a normal record. It'll be cleaned up at the end
1350  * of INSERT processing.
1351  */
1352  Assert(specinsert->data.tp.oldtuple == NULL);
1353  change = specinsert;
1355 
1356  /* intentionally fall through */
1360  Assert(snapshot_now);
1361 
1362  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1363  change->data.tp.relnode.relNode);
1364 
1365  /*
1366  * Catalog tuple without data, emitted while catalog was
1367  * in the process of being rewritten.
1368  */
1369  if (reloid == InvalidOid &&
1370  change->data.tp.newtuple == NULL &&
1371  change->data.tp.oldtuple == NULL)
1372  goto change_done;
1373  else if (reloid == InvalidOid)
1374  elog(ERROR, "could not map filenode \"%s\" to relation OID",
1375  relpathperm(change->data.tp.relnode,
1376  MAIN_FORKNUM));
1377 
1378  relation = RelationIdGetRelation(reloid);
1379 
1380  if (relation == NULL)
1381  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1382  reloid,
1383  relpathperm(change->data.tp.relnode,
1384  MAIN_FORKNUM));
1385 
1386  if (!RelationIsLogicallyLogged(relation))
1387  goto change_done;
1388 
1389  /*
1390  * For now ignore sequence changes entirely. Most of the
1391  * time they don't log changes using records we
1392  * understand, so it doesn't make sense to handle the few
1393  * cases we do.
1394  */
1395  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1396  goto change_done;
1397 
1398  /* user-triggered change */
1399  if (!IsToastRelation(relation))
1400  {
1401  ReorderBufferToastReplace(rb, txn, relation, change);
1402  rb->apply_change(rb, txn, relation, change);
1403 
1404  /*
1405  * Only clear reassembled toast chunks if we're sure
1406  * they're not required anymore. The creator of the
1407  * tuple tells us.
1408  */
1409  if (change->data.tp.clear_toast_afterwards)
1410  ReorderBufferToastReset(rb, txn);
1411  }
1412  /* we're not interested in toast deletions */
1413  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1414  {
1415  /*
1416  * Need to reassemble the full toasted Datum in
1417  * memory, to ensure the chunks don't get reused till
1418  * we're done remove it from the list of this
1419  * transaction's changes. Otherwise it will get
1420  * freed/reused while restoring spooled data from
1421  * disk.
1422  */
1423  dlist_delete(&change->node);
1424  ReorderBufferToastAppendChunk(rb, txn, relation,
1425  change);
1426  }
1427 
1428  change_done:
1429 
1430  /*
1431  * Either speculative insertion was confirmed, or it was
1432  * unsuccessful and the record isn't needed anymore.
1433  */
1434  if (specinsert != NULL)
1435  {
1436  ReorderBufferReturnChange(rb, specinsert);
1437  specinsert = NULL;
1438  }
1439 
1440  if (relation != NULL)
1441  {
1442  RelationClose(relation);
1443  relation = NULL;
1444  }
1445  break;
1446 
1448 
1449  /*
1450  * Speculative insertions are dealt with by delaying the
1451  * processing of the insert until the confirmation record
1452  * arrives. For that we simply unlink the record from the
1453  * chain, so it does not get freed/reused while restoring
1454  * spooled data from disk.
1455  *
1456  * This is safe in the face of concurrent catalog changes
1457  * because the relevant relation can't be changed between
1458  * speculative insertion and confirmation due to
1459  * CheckTableNotInUse() and locking.
1460  */
1461 
1462  /* clear out a pending (and thus failed) speculation */
1463  if (specinsert != NULL)
1464  {
1465  ReorderBufferReturnChange(rb, specinsert);
1466  specinsert = NULL;
1467  }
1468 
1469  /* and memorize the pending insertion */
1470  dlist_delete(&change->node);
1471  specinsert = change;
1472  break;
1473 
1475  rb->message(rb, txn, change->lsn, true,
1476  change->data.msg.prefix,
1477  change->data.msg.message_size,
1478  change->data.msg.message);
1479  break;
1480 
1482  /* get rid of the old */
1483  TeardownHistoricSnapshot(false);
1484 
1485  if (snapshot_now->copied)
1486  {
1487  ReorderBufferFreeSnap(rb, snapshot_now);
1488  snapshot_now =
1489  ReorderBufferCopySnap(rb, change->data.snapshot,
1490  txn, command_id);
1491  }
1492 
1493  /*
1494  * Restored from disk, need to be careful not to double
1495  * free. We could introduce refcounting for that, but for
1496  * now this seems infrequent enough not to care.
1497  */
1498  else if (change->data.snapshot->copied)
1499  {
1500  snapshot_now =
1501  ReorderBufferCopySnap(rb, change->data.snapshot,
1502  txn, command_id);
1503  }
1504  else
1505  {
1506  snapshot_now = change->data.snapshot;
1507  }
1508 
1509 
1510  /* and continue with the new one */
1511  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1512  break;
1513 
1515  Assert(change->data.command_id != InvalidCommandId);
1516 
1517  if (command_id < change->data.command_id)
1518  {
1519  command_id = change->data.command_id;
1520 
1521  if (!snapshot_now->copied)
1522  {
1523  /* we don't use the global one anymore */
1524  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1525  txn, command_id);
1526  }
1527 
1528  snapshot_now->curcid = command_id;
1529 
1530  TeardownHistoricSnapshot(false);
1531  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1532 
1533  /*
1534  * Every time the CommandId is incremented, we could
1535  * see new catalog contents, so execute all
1536  * invalidations.
1537  */
1539  }
1540 
1541  break;
1542 
1544  elog(ERROR, "tuplecid value in changequeue");
1545  break;
1546  }
1547  }
1548 
1549  /*
1550  * There's a speculative insertion remaining, just clean in up, it
1551  * can't have been successful, otherwise we'd gotten a confirmation
1552  * record.
1553  */
1554  if (specinsert)
1555  {
1556  ReorderBufferReturnChange(rb, specinsert);
1557  specinsert = NULL;
1558  }
1559 
1560  /* clean up the iterator */
1561  ReorderBufferIterTXNFinish(rb, iterstate);
1562  iterstate = NULL;
1563 
1564  /* call commit callback */
1565  rb->commit(rb, txn, commit_lsn);
1566 
1567  /* this is just a sanity check against bad output plugin behaviour */
1569  elog(ERROR, "output plugin used XID %u",
1571 
1572  /* cleanup */
1573  TeardownHistoricSnapshot(false);
1574 
1575  /*
1576  * Aborting the current (sub-)transaction as a whole has the right
1577  * semantics. We want all locks acquired in here to be released, not
1578  * reassigned to the parent and we do not want any database access
1579  * have persistent effects.
1580  */
1582 
1583  /* make sure there's no cache pollution */
1585 
1586  if (using_subtxn)
1588 
1589  if (snapshot_now->copied)
1590  ReorderBufferFreeSnap(rb, snapshot_now);
1591 
1592  /* remove potential on-disk data, and deallocate */
1593  ReorderBufferCleanupTXN(rb, txn);
1594  }
1595  PG_CATCH();
1596  {
1597  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1598  if (iterstate)
1599  ReorderBufferIterTXNFinish(rb, iterstate);
1600 
1602 
1603  /*
1604  * Force cache invalidation to happen outside of a valid transaction
1605  * to prevent catalog access as we just caught an error.
1606  */
1608 
1609  /* make sure there's no cache pollution */
1611 
1612  if (using_subtxn)
1614 
1615  if (snapshot_now->copied)
1616  ReorderBufferFreeSnap(rb, snapshot_now);
1617 
1618  /* remove potential on-disk data, and deallocate */
1619  ReorderBufferCleanupTXN(rb, txn);
1620 
1621  PG_RE_THROW();
1622  }
1623  PG_END_TRY();
1624 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
uint32 CommandId
Definition: c.h:477
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
TimestampTz commit_time
void AbortCurrentTransaction(void)
Definition: xact.c:2985
bool IsToastRelation(Relation relation)
Definition: catalog.c:136
#define relpathperm(rnode, forknum)
Definition: relpath.h:67
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
bool copied
Definition: snapshot.h:96
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4466
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2017
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:584
Form_pg_class rd_rel
Definition: rel.h:114
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
XLogRecPtr origin_lsn
#define FirstCommandId
Definition: c.h:479
struct ReorderBufferChange::@102::@103 tp
#define ERROR
Definition: elog.h:43
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:418
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4277
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:435
#define InvalidTransactionId
Definition: transam.h:31
XLogRecPtr final_lsn
void RelationClose(Relation relation)
Definition: relcache.c:2133
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
#define InvalidCommandId
Definition: c.h:480
union ReorderBufferChange::@102 data
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
struct ReorderBufferChange::@102::@104 msg
#define InvalidOid
Definition: postgres_ext.h:36
CommandId curcid
Definition: snapshot.h:98
static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define PG_CATCH()
Definition: elog.h:293
#define Assert(condition)
Definition: c.h:688
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
XLogRecPtr end_lsn
void StartTransactionCommand(void)
Definition: xact.c:2674
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4172
#define PG_RE_THROW()
Definition: elog.h:314
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2001
#define elog
Definition: elog.h:219
#define PG_TRY()
Definition: elog.h:284
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
#define RELKIND_SEQUENCE
Definition: pg_class.h:162
static ReorderBufferIterTXNState * ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2043
#define PG_END_TRY()
Definition: elog.h:300

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer ,
TransactionId  ,
TransactionId  ,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 701 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_delete(), dlist_push_tail(), elog, ReorderBufferTXN::end_lsn, ERROR, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

704 {
705  ReorderBufferTXN *txn;
706  ReorderBufferTXN *subtxn;
707 
708  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
709  InvalidXLogRecPtr, false);
710 
711  /*
712  * No need to do anything if that subtxn didn't contain any changes
713  */
714  if (!subtxn)
715  return;
716 
717  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true);
718 
719  if (txn == NULL)
720  elog(ERROR, "subxact logged without previous toplevel record");
721 
722  /*
723  * Pass our base snapshot to the parent transaction if it doesn't have
724  * one, or ours is older. That can happen if there are no changes in the
725  * toplevel transaction but in one of the child transactions. This allows
726  * the parent to simply use its base snapshot initially.
727  */
728  if (subtxn->base_snapshot != NULL &&
729  (txn->base_snapshot == NULL ||
730  txn->base_snapshot_lsn > subtxn->base_snapshot_lsn))
731  {
732  txn->base_snapshot = subtxn->base_snapshot;
733  txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
734  subtxn->base_snapshot = NULL;
736  }
737 
738  subtxn->final_lsn = commit_lsn;
739  subtxn->end_lsn = end_lsn;
740 
741  if (!subtxn->is_known_as_subxact)
742  {
743  subtxn->is_known_as_subxact = true;
744  Assert(subtxn->nsubtxns == 0);
745 
746  /* remove from lsn order list of top-level transactions */
747  dlist_delete(&subtxn->node);
748 
749  /* add to subtransaction list */
750  dlist_push_tail(&txn->subtxns, &subtxn->node);
751  txn->nsubtxns++;
752  }
753 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
XLogRecPtr base_snapshot_lsn
#define ERROR
Definition: elog.h:43
XLogRecPtr final_lsn
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define Assert(condition)
Definition: c.h:688
XLogRecPtr end_lsn
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define elog
Definition: elog.h:219

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn 
)

Definition at line 1723 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

1724 {
1725  ReorderBufferTXN *txn;
1726 
1727  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1728  false);
1729 
1730  /* unknown, nothing to forget */
1731  if (txn == NULL)
1732  return;
1733 
1734  /* cosmetic... */
1735  txn->final_lsn = lsn;
1736 
1737  /*
1738  * Process cache invalidation messages if there are any. Even if we're not
1739  * interested in the transaction's contents, it could have manipulated the
1740  * catalog and we need to update the caches according to that.
1741  */
1742  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
1744  txn->invalidations);
1745  else
1746  Assert(txn->ninvalidations == 0);
1747 
1748  /* remove potential on-disk data, and deallocate */
1749  ReorderBufferCleanupTXN(rb, txn);
1750 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:688
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer )

Definition at line 279 of file reorderbuffer.c.

References ReorderBuffer::context, and MemoryContextDelete().

Referenced by FreeDecodingContext().

280 {
281  MemoryContext context = rb->context;
282 
283  /*
284  * We free separately allocated data by entirely scrapping reorderbuffer's
285  * memory context.
286  */
287  MemoryContextDelete(context);
288 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:198

◆ ReorderBufferGetChange()

ReorderBufferChange* ReorderBufferGetChange ( ReorderBuffer )

Definition at line 347 of file reorderbuffer.c.

References ReorderBuffer::change_context, and MemoryContextAlloc().

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddNewTupleCids(), ReorderBufferAddSnapshot(), ReorderBufferQueueMessage(), and ReorderBufferRestoreChange().

348 {
349  ReorderBufferChange *change;
350 
351  change = (ReorderBufferChange *)
352  MemoryContextAlloc(rb->change_context, sizeof(ReorderBufferChange));
353 
354  memset(change, 0, sizeof(ReorderBufferChange));
355  return change;
356 }
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:693

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer )

Definition at line 633 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessRunningXacts().

634 {
635  ReorderBufferTXN *txn;
636 
637  if (dlist_is_empty(&rb->toplevel_by_lsn))
638  return NULL;
639 
640  AssertTXNLsnOrder(rb);
641 
642  txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
643 
646  return txn;
647 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:688
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289

◆ ReorderBufferGetTupleBuf()

ReorderBufferTupleBuf* ReorderBufferGetTupleBuf ( ReorderBuffer ,
Size  tuple_len 
)

Definition at line 416 of file reorderbuffer.c.

References ReorderBufferTupleBuf::alloc_tuple_size, MemoryContextAlloc(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, HeapTupleData::t_data, ReorderBuffer::tup_context, and ReorderBufferTupleBuf::tuple.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

417 {
418  ReorderBufferTupleBuf *tuple;
419  Size alloc_len;
420 
421  alloc_len = tuple_len + SizeofHeapTupleHeader;
422 
423  tuple = (ReorderBufferTupleBuf *)
424  MemoryContextAlloc(rb->tup_context,
425  sizeof(ReorderBufferTupleBuf) +
426  MAXIMUM_ALIGNOF + alloc_len);
427  tuple->alloc_tuple_size = alloc_len;
428  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
429 
430  return tuple;
431 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:175
HeapTupleHeader t_data
Definition: htup.h:67
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:36
HeapTupleData tuple
Definition: reorderbuffer.h:27
size_t Size
Definition: c.h:422
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:693

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer ,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 1759 of file reorderbuffer.c.

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by DecodeStandbyOp(), and ReorderBufferForget().

1761 {
1762  bool use_subtxn = IsTransactionOrTransactionBlock();
1763  int i;
1764 
1765  if (use_subtxn)
1766  BeginInternalSubTransaction("replay");
1767 
1768  /*
1769  * Force invalidations to happen outside of a valid transaction - that way
1770  * entries will just be marked as invalid without accessing the catalog.
1771  * That's advantageous because we don't need to setup the full state
1772  * necessary for catalog access.
1773  */
1774  if (use_subtxn)
1776 
1777  for (i = 0; i < ninvalidations; i++)
1778  LocalExecuteInvalidationMessage(&invalidations[i]);
1779 
1780  if (use_subtxn)
1782 }
void AbortCurrentTransaction(void)
Definition: xact.c:2985
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4466
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4277
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4172
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:554
int i

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer ,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1795 of file reorderbuffer.c.

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by DecodeHeap2Op(), DecodeHeapOp(), DecodeLogicalMsgOp(), DecodeStandbyOp(), DecodeXactOp(), DecodeXLogOp(), and LogicalDecodingProcessRecord().

1796 {
1797  /* many records won't have an xid assigned, centralize check here */
1798  if (xid != InvalidTransactionId)
1799  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1800 }
#define InvalidTransactionId
Definition: transam.h:31
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
ReorderBufferChange  
)

Definition at line 535 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBufferCheckSerializeTXN(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

537 {
538  ReorderBufferTXN *txn;
539 
540  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
541 
542  change->lsn = lsn;
543  Assert(InvalidXLogRecPtr != lsn);
544  dlist_push_tail(&txn->changes, &change->node);
545  txn->nentries++;
546  txn->nentries_mem++;
547 
549 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
dlist_head changes
#define Assert(condition)
Definition: c.h:688
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer ,
TransactionId  ,
Snapshot  snapshot,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 555 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), TeardownHistoricSnapshot(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeLogicalMsgOp().

559 {
560  if (transactional)
561  {
562  MemoryContext oldcontext;
563  ReorderBufferChange *change;
564 
566 
567  oldcontext = MemoryContextSwitchTo(rb->context);
568 
569  change = ReorderBufferGetChange(rb);
571  change->data.msg.prefix = pstrdup(prefix);
572  change->data.msg.message_size = message_size;
573  change->data.msg.message = palloc(message_size);
574  memcpy(change->data.msg.message, message, message_size);
575 
576  ReorderBufferQueueChange(rb, xid, lsn, change);
577 
578  MemoryContextSwitchTo(oldcontext);
579  }
580  else
581  {
582  ReorderBufferTXN *txn = NULL;
583  volatile Snapshot snapshot_now = snapshot;
584 
585  if (xid != InvalidTransactionId)
586  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
587 
588  /* setup snapshot to allow catalog access */
589  SetupHistoricSnapshot(snapshot_now, NULL);
590  PG_TRY();
591  {
592  rb->message(rb, txn, lsn, false, prefix, message_size, message);
593 
595  }
596  PG_CATCH();
597  {
599  PG_RE_THROW();
600  }
601  PG_END_TRY();
602  }
603 }
char * pstrdup(const char *in)
Definition: mcxt.c:1063
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2017
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
#define InvalidTransactionId
Definition: transam.h:31
union ReorderBufferChange::@102 data
struct ReorderBufferChange::@102::@104 msg
#define PG_CATCH()
Definition: elog.h:293
#define Assert(condition)
Definition: c.h:688
#define PG_RE_THROW()
Definition: elog.h:314
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:835
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2001
#define PG_TRY()
Definition: elog.h:284
#define PG_END_TRY()
Definition: elog.h:300

◆ ReorderBufferReturnChange()

void ReorderBufferReturnChange ( ReorderBuffer ,
ReorderBufferChange  
)

Definition at line 365 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::msg, pfree(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferFreeSnap(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, and ReorderBufferChange::tp.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferCommit(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), and ReorderBufferToastReset().

366 {
367  /* free contained data */
368  switch (change->action)
369  {
374  if (change->data.tp.newtuple)
375  {
376  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
377  change->data.tp.newtuple = NULL;
378  }
379 
380  if (change->data.tp.oldtuple)
381  {
382  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
383  change->data.tp.oldtuple = NULL;
384  }
385  break;
387  if (change->data.msg.prefix != NULL)
388  pfree(change->data.msg.prefix);
389  change->data.msg.prefix = NULL;
390  if (change->data.msg.message != NULL)
391  pfree(change->data.msg.message);
392  change->data.msg.message = NULL;
393  break;
395  if (change->data.snapshot)
396  {
397  ReorderBufferFreeSnap(rb, change->data.snapshot);
398  change->data.snapshot = NULL;
399  }
400  break;
401  /* no data in addition to the struct itself */
405  break;
406  }
407 
408  pfree(change);
409 }
void pfree(void *pointer)
Definition: mcxt.c:936
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)

◆ ReorderBufferReturnTupleBuf()

void ReorderBufferReturnTupleBuf ( ReorderBuffer ,
ReorderBufferTupleBuf tuple 
)

Definition at line 440 of file reorderbuffer.c.

References pfree().

Referenced by ReorderBufferReturnChange().

441 {
442  pfree(tuple);
443 }
void pfree(void *pointer)
Definition: mcxt.c:936

◆ ReorderBufferSetBaseSnapshot()

void ReorderBufferSetBaseSnapshot ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
struct SnapshotData snap 
)

◆ ReorderBufferSetRestartPoint()

void ReorderBufferSetRestartPoint ( ReorderBuffer ,
XLogRecPtr  ptr 
)

Definition at line 650 of file reorderbuffer.c.

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

651 {
652  rb->current_restart_decoding_lsn = ptr;
653 }

◆ ReorderBufferXidHasBaseSnapshot()

bool ReorderBufferXidHasBaseSnapshot ( ReorderBuffer ,
TransactionId  xid 
)

Definition at line 1961 of file reorderbuffer.c.

References ReorderBufferTXN::base_snapshot, InvalidXLogRecPtr, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildCommitTxn(), SnapBuildDistributeNewCatalogSnapshot(), and SnapBuildProcessChange().

1962 {
1963  ReorderBufferTXN *txn;
1964 
1965  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1966  false);
1967 
1968  /* transaction isn't known yet, ergo no snapshot */
1969  if (txn == NULL)
1970  return false;
1971 
1972  /*
1973  * TODO: It would be a nice improvement if we would check the toplevel
1974  * transaction in subtransactions, but we'd need to keep track of a bit
1975  * more state.
1976  */
1977  return txn->base_snapshot != NULL;
1978 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferXidHasCatalogChanges()

bool ReorderBufferXidHasCatalogChanges ( ReorderBuffer ,
TransactionId  xid 
)

Definition at line 1945 of file reorderbuffer.c.

References ReorderBufferTXN::has_catalog_changes, InvalidXLogRecPtr, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildCommitTxn().

1946 {
1947  ReorderBufferTXN *txn;
1948 
1949  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1950  false);
1951  if (txn == NULL)
1952  return false;
1953 
1954  return txn->has_catalog_changes;
1955 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferXidSetCatalogChanges()

void ReorderBufferXidSetCatalogChanges ( ReorderBuffer ,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1930 of file reorderbuffer.c.

References ReorderBufferTXN::has_catalog_changes, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit(), DecodeHeapOp(), and SnapBuildProcessNewCid().

1932 {
1933  ReorderBufferTXN *txn;
1934 
1935  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1936 
1937  txn->has_catalog_changes = true;
1938 }
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ StartupReorderBuffer()

void StartupReorderBuffer ( void  )

Definition at line 2576 of file reorderbuffer.c.

References AllocateDir(), dirent::d_name, DEBUG2, ereport, errcode_for_file_access(), errmsg(), FreeDir(), lstat, MAXPGPATH, PANIC, ReadDir(), ReplicationSlotValidateName(), S_ISDIR, and stat.

Referenced by StartupXLOG().

2577 {
2578  DIR *logical_dir;
2579  struct dirent *logical_de;
2580 
2581  DIR *spill_dir;
2582  struct dirent *spill_de;
2583 
2584  logical_dir = AllocateDir("pg_replslot");
2585  while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
2586  {
2587  struct stat statbuf;
2588  char path[MAXPGPATH * 2 + 12];
2589 
2590  if (strcmp(logical_de->d_name, ".") == 0 ||
2591  strcmp(logical_de->d_name, "..") == 0)
2592  continue;
2593 
2594  /* if it cannot be a slot, skip the directory */
2595  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
2596  continue;
2597 
2598  /*
2599  * ok, has to be a surviving logical slot, iterate and delete
2600  * everything starting with xid-*
2601  */
2602  sprintf(path, "pg_replslot/%s", logical_de->d_name);
2603 
2604  /* we're only creating directories here, skip if it's not our's */
2605  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2606  continue;
2607 
2608  spill_dir = AllocateDir(path);
2609  while ((spill_de = ReadDir(spill_dir, path)) != NULL)
2610  {
2611  if (strcmp(spill_de->d_name, ".") == 0 ||
2612  strcmp(spill_de->d_name, "..") == 0)
2613  continue;
2614 
2615  /* only look at names that can be ours */
2616  if (strncmp(spill_de->d_name, "xid", 3) == 0)
2617  {
2618  sprintf(path, "pg_replslot/%s/%s", logical_de->d_name,
2619  spill_de->d_name);
2620 
2621  if (unlink(path) != 0)
2622  ereport(PANIC,
2624  errmsg("could not remove file \"%s\": %m",
2625  path)));
2626  }
2627  }
2628  FreeDir(spill_dir);
2629  }
2630  FreeDir(logical_dir);
2631 }
Definition: dirent.h:9
#define PANIC
Definition: elog.h:53
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:174
Definition: dirent.c:25
#define MAXPGPATH
#define DEBUG2
Definition: elog.h:24
int errcode_for_file_access(void)
Definition: elog.c:598
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2607
#define ereport(elevel, rest)
Definition: elog.h:122
#define stat(a, b)
Definition: win32_port.h:266
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2673
#define S_ISDIR(m)
Definition: win32_port.h:307
#define lstat(path, sb)
Definition: win32_port.h:255
int errmsg(const char *fmt,...)
Definition: elog.c:797
char d_name[MAX_PATH]
Definition: dirent.h:14
int FreeDir(DIR *dir)
Definition: fd.c:2725