PostgreSQL Source Code  git master
reorderbuffer.h File Reference
#include "access/htup_details.h"
#include "lib/ilist.h"
#include "storage/sinval.h"
#include "utils/hsearch.h"
#include "utils/relcache.h"
#include "utils/snapshot.h"
#include "utils/timestamp.h"
Include dependency graph for reorderbuffer.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTupleBuf
 
struct  ReorderBufferChange
 
struct  ReorderBufferTXN
 
struct  ReorderBuffer
 

Macros

#define ReorderBufferTupleBufData(p)   ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))
 

Typedefs

typedef struct ReorderBufferTupleBuf ReorderBufferTupleBuf
 
typedef struct ReorderBufferChange ReorderBufferChange
 
typedef struct ReorderBufferTXN ReorderBufferTXN
 
typedef struct ReorderBuffer ReorderBuffer
 
typedef void(* ReorderBufferApplyChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
typedef void(* ReorderBufferBeginCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
typedef void(* ReorderBufferCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
typedef void(* ReorderBufferMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 

Enumerations

enum  ReorderBufferChangeType {
  REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_MESSAGE,
  REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
  REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
}
 

Functions

ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *)
 
ReorderBufferTupleBufReorderBufferGetTupleBuf (ReorderBuffer *, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (ReorderBuffer *, ReorderBufferTupleBuf *tuple)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *)
 
void ReorderBufferReturnChange (ReorderBuffer *, ReorderBufferChange *)
 
void ReorderBufferQueueChange (ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *)
 
void ReorderBufferQueueMessage (ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
void ReorderBufferCommit (ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferAssignChild (ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
void ReorderBufferAbort (ReorderBuffer *, TransactionId, XLogRecPtr lsn)
 
void ReorderBufferAbortOld (ReorderBuffer *, TransactionId xid)
 
void ReorderBufferForget (ReorderBuffer *, TransactionId, XLogRecPtr lsn)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap)
 
void ReorderBufferAddSnapshot (ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *, TransactionId, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *, TransactionId, XLogRecPtr lsn, RelFileNode node, ItemPointerData pt, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *, TransactionId, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *, TransactionId xid, XLogRecPtr lsn)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *, TransactionId xid)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *, XLogRecPtr ptr)
 
void StartupReorderBuffer (void)
 

Macro Definition Documentation

◆ ReorderBufferTupleBufData

#define ReorderBufferTupleBufData (   p)    ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))

Typedef Documentation

◆ ReorderBuffer

Definition at line 275 of file reorderbuffer.h.

◆ ReorderBufferApplyChangeCB

typedef void(* ReorderBufferApplyChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)

Definition at line 278 of file reorderbuffer.h.

◆ ReorderBufferBeginCB

typedef void(* ReorderBufferBeginCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)

Definition at line 285 of file reorderbuffer.h.

◆ ReorderBufferChange

◆ ReorderBufferCommitCB

typedef void(* ReorderBufferCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

Definition at line 290 of file reorderbuffer.h.

◆ ReorderBufferMessageCB

typedef void(* ReorderBufferMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)

Definition at line 296 of file reorderbuffer.h.

◆ ReorderBufferTupleBuf

◆ ReorderBufferTXN

Enumeration Type Documentation

◆ ReorderBufferChangeType

Enumerator
REORDER_BUFFER_CHANGE_INSERT 
REORDER_BUFFER_CHANGE_UPDATE 
REORDER_BUFFER_CHANGE_DELETE 
REORDER_BUFFER_CHANGE_MESSAGE 
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT 
REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID 
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM 

Definition at line 52 of file reorderbuffer.h.

Function Documentation

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn 
)

Definition at line 1636 of file reorderbuffer.c.

References ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferCleanupTXN(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeAbort().

1637 {
1638  ReorderBufferTXN *txn;
1639 
1640  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1641  false);
1642 
1643  /* unknown, nothing to remove */
1644  if (txn == NULL)
1645  return;
1646 
1647  /* cosmetic... */
1648  txn->final_lsn = lsn;
1649 
1650  /* remove potential on-disk data, and deallocate */
1651  ReorderBufferCleanupTXN(rb, txn);
1652 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer ,
TransactionId  xid 
)

Definition at line 1662 of file reorderbuffer.c.

References dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, elog, ReorderBufferCleanupTXN(), ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::xid.

Referenced by DecodeStandbyOp().

1663 {
1664  dlist_mutable_iter it;
1665 
1666  /*
1667  * Iterate through all (potential) toplevel TXNs and abort all that are
1668  * older than what possibly can be running. Once we've found the first
1669  * that is alive we stop, there might be some that acquired an xid earlier
1670  * but started writing later, but it's unlikely and they will cleaned up
1671  * in a later call to ReorderBufferAbortOld().
1672  */
1673  dlist_foreach_modify(it, &rb->toplevel_by_lsn)
1674  {
1675  ReorderBufferTXN *txn;
1676 
1677  txn = dlist_container(ReorderBufferTXN, node, it.cur);
1678 
1679  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1680  {
1681  elog(DEBUG2, "aborting old transaction %u", txn->xid);
1682 
1683  /* remove potential on-disk data, and deallocate this tx */
1684  ReorderBufferCleanupTXN(rb, txn);
1685  }
1686  else
1687  return;
1688  }
1689 }
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define DEBUG2
Definition: elog.h:24
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
#define elog
Definition: elog.h:219

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 1874 of file reorderbuffer.c.

References Assert, ReorderBuffer::context, elog, ERROR, ReorderBufferTXN::invalidations, MemoryContextAlloc(), ReorderBufferTXN::ninvalidations, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

1877 {
1878  ReorderBufferTXN *txn;
1879 
1880  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1881 
1882  if (txn->ninvalidations != 0)
1883  elog(ERROR, "only ever add one set of invalidations");
1884 
1885  Assert(nmsgs > 0);
1886 
1887  txn->ninvalidations = nmsgs;
1889  MemoryContextAlloc(rb->context,
1890  sizeof(SharedInvalidationMessage) * nmsgs);
1891  memcpy(txn->invalidations, msgs,
1892  sizeof(SharedInvalidationMessage) * nmsgs);
1893 }
#define ERROR
Definition: elog.h:43
#define Assert(condition)
Definition: c.h:670
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:706
#define elog
Definition: elog.h:219

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
CommandId  cid 
)

Definition at line 1830 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::command_id, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, ReorderBufferGetChange(), and ReorderBufferQueueChange().

Referenced by SnapBuildProcessNewCid().

1832 {
1834 
1835  change->data.command_id = cid;
1837 
1838  ReorderBufferQueueChange(rb, xid, lsn, change);
1839 }
union ReorderBufferChange::@93 data
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
RelFileNode  node,
ItemPointerData  pt,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 1846 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessNewCid().

1850 {
1852  ReorderBufferTXN *txn;
1853 
1854  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1855 
1856  change->data.tuplecid.node = node;
1857  change->data.tuplecid.tid = tid;
1858  change->data.tuplecid.cmin = cmin;
1859  change->data.tuplecid.cmax = cmax;
1860  change->data.tuplecid.combocid = combocid;
1861  change->lsn = lsn;
1863 
1864  dlist_push_tail(&txn->tuplecids, &change->node);
1865  txn->ntuplecids++;
1866 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
union ReorderBufferChange::@93 data
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
struct ReorderBufferChange::@93::@96 tuplecid
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
dlist_head tuplecids

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
struct SnapshotData snap 
)

◆ ReorderBufferAllocate()

ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 220 of file reorderbuffer.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate(), buffer, ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), ReorderBuffer::outbuf, ReorderBuffer::outbufsize, SLAB_DEFAULT_BLOCK_SIZE, SLAB_LARGE_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::toplevel_by_lsn, ReorderBuffer::tup_context, and ReorderBuffer::txn_context.

Referenced by StartupDecodingContext().

221 {
223  HASHCTL hash_ctl;
224  MemoryContext new_ctx;
225 
226  /* allocate memory in own context, to have better accountability */
228  "ReorderBuffer",
230 
231  buffer =
232  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
233 
234  memset(&hash_ctl, 0, sizeof(hash_ctl));
235 
236  buffer->context = new_ctx;
237 
238  buffer->change_context = SlabContextCreate(new_ctx,
239  "Change",
241  sizeof(ReorderBufferChange));
242 
243  buffer->txn_context = SlabContextCreate(new_ctx,
244  "TXN",
246  sizeof(ReorderBufferTXN));
247 
248  buffer->tup_context = GenerationContextCreate(new_ctx,
249  "Tuples",
251 
252  hash_ctl.keysize = sizeof(TransactionId);
253  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
254  hash_ctl.hcxt = buffer->context;
255 
256  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
258 
260  buffer->by_txn_last_txn = NULL;
261 
262  buffer->outbuf = NULL;
263  buffer->outbufsize = 0;
264 
266 
267  dlist_init(&buffer->toplevel_by_lsn);
268 
269  return buffer;
270 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
uint32 TransactionId
Definition: c.h:445
MemoryContext hcxt
Definition: hsearch.h:78
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
Size entrysize
Definition: hsearch.h:73
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:183
MemoryContext change_context
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:200
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:170
#define InvalidTransactionId
Definition: transam.h:31
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size blockSize)
Definition: generation.c:200
MemoryContext context
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
ReorderBufferTXN * by_txn_last_txn
dlist_head toplevel_by_lsn
Size keysize
Definition: hsearch.h:72
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:214
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:199
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:706
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
MemoryContext tup_context
MemoryContext txn_context

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer ,
TransactionId  ,
TransactionId  ,
XLogRecPtr  commit_lsn 
)

Definition at line 653 of file reorderbuffer.c.

References Assert, dlist_delete(), dlist_push_tail(), elog, ERROR, ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeXactOp().

655 {
656  ReorderBufferTXN *txn;
657  ReorderBufferTXN *subtxn;
658  bool new_top;
659  bool new_sub;
660 
661  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
662  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
663 
664  if (new_sub)
665  {
666  /*
667  * we assign subtransactions to top level transaction even if we don't
668  * have data for it yet, assignment records frequently reference xids
669  * that have not yet produced any records. Knowing those aren't top
670  * level xids allows us to make processing cheaper in some places.
671  */
672  dlist_push_tail(&txn->subtxns, &subtxn->node);
673  txn->nsubtxns++;
674  }
675  else if (!subtxn->is_known_as_subxact)
676  {
677  subtxn->is_known_as_subxact = true;
678  Assert(subtxn->nsubtxns == 0);
679 
680  /* remove from lsn order list of top-level transactions */
681  dlist_delete(&subtxn->node);
682 
683  /* add to toplevel transaction */
684  dlist_push_tail(&txn->subtxns, &subtxn->node);
685  txn->nsubtxns++;
686  }
687  else if (new_top)
688  {
689  elog(ERROR, "existing subxact assigned to unknown toplevel xact");
690  }
691 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
#define ERROR
Definition: elog.h:43
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define Assert(condition)
Definition: c.h:670
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define elog
Definition: elog.h:219

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 1265 of file reorderbuffer.c.

References AbortCurrentTransaction(), ReorderBufferChange::action, ReorderBuffer::apply_change, Assert, ReorderBufferTXN::base_snapshot, ReorderBuffer::begin, BeginInternalSubTransaction(), ReorderBufferChange::command_id, ReorderBuffer::commit, ReorderBufferTXN::commit_time, SnapshotData::copied, SnapshotData::curcid, ReorderBufferChange::data, dlist_delete(), elog, ReorderBufferTXN::end_lsn, ERROR, ReorderBufferTXN::final_lsn, FirstCommandId, GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, ReorderBuffer::message, ReorderBufferChange::msg, ReorderBufferTXN::ninvalidations, ReorderBufferChange::node, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelidByRelfilenode(), RELKIND_SEQUENCE, relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferReturnChange(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTXNByXid(), RollbackAndReleaseCurrentSubTransaction(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, StartTransactionCommand(), TeardownHistoricSnapshot(), ReorderBufferChange::tp, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

1269 {
1270  ReorderBufferTXN *txn;
1271  volatile Snapshot snapshot_now;
1272  volatile CommandId command_id = FirstCommandId;
1273  bool using_subtxn;
1274  ReorderBufferIterTXNState *volatile iterstate = NULL;
1275 
1276  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1277  false);
1278 
1279  /* unknown transaction, nothing to replay */
1280  if (txn == NULL)
1281  return;
1282 
1283  txn->final_lsn = commit_lsn;
1284  txn->end_lsn = end_lsn;
1285  txn->commit_time = commit_time;
1286  txn->origin_id = origin_id;
1287  txn->origin_lsn = origin_lsn;
1288 
1289  /*
1290  * If this transaction didn't have any real changes in our database, it's
1291  * OK not to have a snapshot. Note that ReorderBufferCommitChild will have
1292  * transferred its snapshot to this transaction if it had one and the
1293  * toplevel tx didn't.
1294  */
1295  if (txn->base_snapshot == NULL)
1296  {
1297  Assert(txn->ninvalidations == 0);
1298  ReorderBufferCleanupTXN(rb, txn);
1299  return;
1300  }
1301 
1302  snapshot_now = txn->base_snapshot;
1303 
1304  /* build data to be able to lookup the CommandIds of catalog tuples */
1306 
1307  /* setup the initial snapshot */
1308  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1309 
1310  /*
1311  * Decoding needs access to syscaches et al., which in turn use
1312  * heavyweight locks and such. Thus we need to have enough state around to
1313  * keep track of those. The easiest way is to simply use a transaction
1314  * internally. That also allows us to easily enforce that nothing writes
1315  * to the database by checking for xid assignments.
1316  *
1317  * When we're called via the SQL SRF there's already a transaction
1318  * started, so start an explicit subtransaction there.
1319  */
1320  using_subtxn = IsTransactionOrTransactionBlock();
1321 
1322  PG_TRY();
1323  {
1324  ReorderBufferChange *change;
1325  ReorderBufferChange *specinsert = NULL;
1326 
1327  if (using_subtxn)
1328  BeginInternalSubTransaction("replay");
1329  else
1331 
1332  rb->begin(rb, txn);
1333 
1334  iterstate = ReorderBufferIterTXNInit(rb, txn);
1335  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1336  {
1337  Relation relation = NULL;
1338  Oid reloid;
1339 
1340  switch (change->action)
1341  {
1343 
1344  /*
1345  * Confirmation for speculative insertion arrived. Simply
1346  * use as a normal record. It'll be cleaned up at the end
1347  * of INSERT processing.
1348  */
1349  Assert(specinsert->data.tp.oldtuple == NULL);
1350  change = specinsert;
1352 
1353  /* intentionally fall through */
1357  Assert(snapshot_now);
1358 
1359  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1360  change->data.tp.relnode.relNode);
1361 
1362  /*
1363  * Catalog tuple without data, emitted while catalog was
1364  * in the process of being rewritten.
1365  */
1366  if (reloid == InvalidOid &&
1367  change->data.tp.newtuple == NULL &&
1368  change->data.tp.oldtuple == NULL)
1369  goto change_done;
1370  else if (reloid == InvalidOid)
1371  elog(ERROR, "could not map filenode \"%s\" to relation OID",
1372  relpathperm(change->data.tp.relnode,
1373  MAIN_FORKNUM));
1374 
1375  relation = RelationIdGetRelation(reloid);
1376 
1377  if (relation == NULL)
1378  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1379  reloid,
1380  relpathperm(change->data.tp.relnode,
1381  MAIN_FORKNUM));
1382 
1383  if (!RelationIsLogicallyLogged(relation))
1384  goto change_done;
1385 
1386  /*
1387  * For now ignore sequence changes entirely. Most of the
1388  * time they don't log changes using records we
1389  * understand, so it doesn't make sense to handle the few
1390  * cases we do.
1391  */
1392  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1393  goto change_done;
1394 
1395  /* user-triggered change */
1396  if (!IsToastRelation(relation))
1397  {
1398  ReorderBufferToastReplace(rb, txn, relation, change);
1399  rb->apply_change(rb, txn, relation, change);
1400 
1401  /*
1402  * Only clear reassembled toast chunks if we're sure
1403  * they're not required anymore. The creator of the
1404  * tuple tells us.
1405  */
1406  if (change->data.tp.clear_toast_afterwards)
1407  ReorderBufferToastReset(rb, txn);
1408  }
1409  /* we're not interested in toast deletions */
1410  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1411  {
1412  /*
1413  * Need to reassemble the full toasted Datum in
1414  * memory, to ensure the chunks don't get reused till
1415  * we're done remove it from the list of this
1416  * transaction's changes. Otherwise it will get
1417  * freed/reused while restoring spooled data from
1418  * disk.
1419  */
1420  dlist_delete(&change->node);
1421  ReorderBufferToastAppendChunk(rb, txn, relation,
1422  change);
1423  }
1424 
1425  change_done:
1426 
1427  /*
1428  * Either speculative insertion was confirmed, or it was
1429  * unsuccessful and the record isn't needed anymore.
1430  */
1431  if (specinsert != NULL)
1432  {
1433  ReorderBufferReturnChange(rb, specinsert);
1434  specinsert = NULL;
1435  }
1436 
1437  if (relation != NULL)
1438  {
1439  RelationClose(relation);
1440  relation = NULL;
1441  }
1442  break;
1443 
1445 
1446  /*
1447  * Speculative insertions are dealt with by delaying the
1448  * processing of the insert until the confirmation record
1449  * arrives. For that we simply unlink the record from the
1450  * chain, so it does not get freed/reused while restoring
1451  * spooled data from disk.
1452  *
1453  * This is safe in the face of concurrent catalog changes
1454  * because the relevant relation can't be changed between
1455  * speculative insertion and confirmation due to
1456  * CheckTableNotInUse() and locking.
1457  */
1458 
1459  /* clear out a pending (and thus failed) speculation */
1460  if (specinsert != NULL)
1461  {
1462  ReorderBufferReturnChange(rb, specinsert);
1463  specinsert = NULL;
1464  }
1465 
1466  /* and memorize the pending insertion */
1467  dlist_delete(&change->node);
1468  specinsert = change;
1469  break;
1470 
1472  rb->message(rb, txn, change->lsn, true,
1473  change->data.msg.prefix,
1474  change->data.msg.message_size,
1475  change->data.msg.message);
1476  break;
1477 
1479  /* get rid of the old */
1480  TeardownHistoricSnapshot(false);
1481 
1482  if (snapshot_now->copied)
1483  {
1484  ReorderBufferFreeSnap(rb, snapshot_now);
1485  snapshot_now =
1486  ReorderBufferCopySnap(rb, change->data.snapshot,
1487  txn, command_id);
1488  }
1489 
1490  /*
1491  * Restored from disk, need to be careful not to double
1492  * free. We could introduce refcounting for that, but for
1493  * now this seems infrequent enough not to care.
1494  */
1495  else if (change->data.snapshot->copied)
1496  {
1497  snapshot_now =
1498  ReorderBufferCopySnap(rb, change->data.snapshot,
1499  txn, command_id);
1500  }
1501  else
1502  {
1503  snapshot_now = change->data.snapshot;
1504  }
1505 
1506 
1507  /* and continue with the new one */
1508  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1509  break;
1510 
1512  Assert(change->data.command_id != InvalidCommandId);
1513 
1514  if (command_id < change->data.command_id)
1515  {
1516  command_id = change->data.command_id;
1517 
1518  if (!snapshot_now->copied)
1519  {
1520  /* we don't use the global one anymore */
1521  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1522  txn, command_id);
1523  }
1524 
1525  snapshot_now->curcid = command_id;
1526 
1527  TeardownHistoricSnapshot(false);
1528  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1529 
1530  /*
1531  * Every time the CommandId is incremented, we could
1532  * see new catalog contents, so execute all
1533  * invalidations.
1534  */
1536  }
1537 
1538  break;
1539 
1541  elog(ERROR, "tuplecid value in changequeue");
1542  break;
1543  }
1544  }
1545 
1546  /*
1547  * There's a speculative insertion remaining, just clean in up, it
1548  * can't have been successful, otherwise we'd gotten a confirmation
1549  * record.
1550  */
1551  if (specinsert)
1552  {
1553  ReorderBufferReturnChange(rb, specinsert);
1554  specinsert = NULL;
1555  }
1556 
1557  /* clean up the iterator */
1558  ReorderBufferIterTXNFinish(rb, iterstate);
1559  iterstate = NULL;
1560 
1561  /* call commit callback */
1562  rb->commit(rb, txn, commit_lsn);
1563 
1564  /* this is just a sanity check against bad output plugin behaviour */
1566  elog(ERROR, "output plugin used XID %u",
1568 
1569  /* cleanup */
1570  TeardownHistoricSnapshot(false);
1571 
1572  /*
1573  * Aborting the current (sub-)transaction as a whole has the right
1574  * semantics. We want all locks acquired in here to be released, not
1575  * reassigned to the parent and we do not want any database access
1576  * have persistent effects.
1577  */
1579 
1580  /* make sure there's no cache pollution */
1582 
1583  if (using_subtxn)
1585 
1586  if (snapshot_now->copied)
1587  ReorderBufferFreeSnap(rb, snapshot_now);
1588 
1589  /* remove potential on-disk data, and deallocate */
1590  ReorderBufferCleanupTXN(rb, txn);
1591  }
1592  PG_CATCH();
1593  {
1594  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1595  if (iterstate)
1596  ReorderBufferIterTXNFinish(rb, iterstate);
1597 
1599 
1600  /*
1601  * Force cache invalidation to happen outside of a valid transaction
1602  * to prevent catalog access as we just caught an error.
1603  */
1605 
1606  /* make sure there's no cache pollution */
1608 
1609  if (using_subtxn)
1611 
1612  if (snapshot_now->copied)
1613  ReorderBufferFreeSnap(rb, snapshot_now);
1614 
1615  /* remove potential on-disk data, and deallocate */
1616  ReorderBufferCleanupTXN(rb, txn);
1617 
1618  PG_RE_THROW();
1619  }
1620  PG_END_TRY();
1621 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
uint32 CommandId
Definition: c.h:459
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
TimestampTz commit_time
void AbortCurrentTransaction(void)
Definition: xact.c:2984
bool IsToastRelation(Relation relation)
Definition: catalog.c:136
#define relpathperm(rnode, forknum)
Definition: relpath.h:67
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
bool copied
Definition: snapshot.h:96
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
union ReorderBufferChange::@93 data
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4465
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2019
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:584
Form_pg_class rd_rel
Definition: rel.h:114
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
XLogRecPtr origin_lsn
#define FirstCommandId
Definition: c.h:461
#define ERROR
Definition: elog.h:43
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:418
struct ReorderBufferChange::@93::@94 tp
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4276
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:435
#define InvalidTransactionId
Definition: transam.h:31
XLogRecPtr final_lsn
void RelationClose(Relation relation)
Definition: relcache.c:2167
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
#define InvalidCommandId
Definition: c.h:462
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
#define InvalidOid
Definition: postgres_ext.h:36
CommandId curcid
Definition: snapshot.h:98
static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define PG_CATCH()
Definition: elog.h:293
#define Assert(condition)
Definition: c.h:670
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
XLogRecPtr end_lsn
void StartTransactionCommand(void)
Definition: xact.c:2673
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4171
#define PG_RE_THROW()
Definition: elog.h:314
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2003
struct ReorderBufferChange::@93::@95 msg
#define elog
Definition: elog.h:219
#define PG_TRY()
Definition: elog.h:284
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
#define RELKIND_SEQUENCE
Definition: pg_class.h:162
static ReorderBufferIterTXNState * ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2078
#define PG_END_TRY()
Definition: elog.h:300

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer ,
TransactionId  ,
TransactionId  ,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 698 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_delete(), dlist_push_tail(), elog, ReorderBufferTXN::end_lsn, ERROR, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

701 {
702  ReorderBufferTXN *txn;
703  ReorderBufferTXN *subtxn;
704 
705  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
706  InvalidXLogRecPtr, false);
707 
708  /*
709  * No need to do anything if that subtxn didn't contain any changes
710  */
711  if (!subtxn)
712  return;
713 
714  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true);
715 
716  if (txn == NULL)
717  elog(ERROR, "subxact logged without previous toplevel record");
718 
719  /*
720  * Pass our base snapshot to the parent transaction if it doesn't have
721  * one, or ours is older. That can happen if there are no changes in the
722  * toplevel transaction but in one of the child transactions. This allows
723  * the parent to simply use its base snapshot initially.
724  */
725  if (subtxn->base_snapshot != NULL &&
726  (txn->base_snapshot == NULL ||
727  txn->base_snapshot_lsn > subtxn->base_snapshot_lsn))
728  {
729  txn->base_snapshot = subtxn->base_snapshot;
730  txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
731  subtxn->base_snapshot = NULL;
733  }
734 
735  subtxn->final_lsn = commit_lsn;
736  subtxn->end_lsn = end_lsn;
737 
738  if (!subtxn->is_known_as_subxact)
739  {
740  subtxn->is_known_as_subxact = true;
741  Assert(subtxn->nsubtxns == 0);
742 
743  /* remove from lsn order list of top-level transactions */
744  dlist_delete(&subtxn->node);
745 
746  /* add to subtransaction list */
747  dlist_push_tail(&txn->subtxns, &subtxn->node);
748  txn->nsubtxns++;
749  }
750 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
XLogRecPtr base_snapshot_lsn
#define ERROR
Definition: elog.h:43
XLogRecPtr final_lsn
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define Assert(condition)
Definition: c.h:670
XLogRecPtr end_lsn
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define elog
Definition: elog.h:219

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn 
)

Definition at line 1705 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

1706 {
1707  ReorderBufferTXN *txn;
1708 
1709  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1710  false);
1711 
1712  /* unknown, nothing to forget */
1713  if (txn == NULL)
1714  return;
1715 
1716  /* cosmetic... */
1717  txn->final_lsn = lsn;
1718 
1719  /*
1720  * Process cache invalidation messages if there are any. Even if we're not
1721  * interested in the transaction's contents, it could have manipulated the
1722  * catalog and we need to update the caches according to that.
1723  */
1724  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
1726  txn->invalidations);
1727  else
1728  Assert(txn->ninvalidations == 0);
1729 
1730  /* remove potential on-disk data, and deallocate */
1731  ReorderBufferCleanupTXN(rb, txn);
1732 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:670
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer )

Definition at line 276 of file reorderbuffer.c.

References ReorderBuffer::context, and MemoryContextDelete().

Referenced by FreeDecodingContext().

277 {
278  MemoryContext context = rb->context;
279 
280  /*
281  * We free separately allocated data by entirely scrapping reorderbuffer's
282  * memory context.
283  */
284  MemoryContextDelete(context);
285 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200

◆ ReorderBufferGetChange()

ReorderBufferChange* ReorderBufferGetChange ( ReorderBuffer )

Definition at line 344 of file reorderbuffer.c.

References ReorderBuffer::change_context, and MemoryContextAlloc().

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddNewTupleCids(), ReorderBufferAddSnapshot(), ReorderBufferQueueMessage(), and ReorderBufferRestoreChange().

345 {
346  ReorderBufferChange *change;
347 
348  change = (ReorderBufferChange *)
349  MemoryContextAlloc(rb->change_context, sizeof(ReorderBufferChange));
350 
351  memset(change, 0, sizeof(ReorderBufferChange));
352  return change;
353 }
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:706

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer )

Definition at line 630 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessRunningXacts().

631 {
632  ReorderBufferTXN *txn;
633 
634  if (dlist_is_empty(&rb->toplevel_by_lsn))
635  return NULL;
636 
637  AssertTXNLsnOrder(rb);
638 
639  txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
640 
643  return txn;
644 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:670
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289

◆ ReorderBufferGetTupleBuf()

ReorderBufferTupleBuf* ReorderBufferGetTupleBuf ( ReorderBuffer ,
Size  tuple_len 
)

Definition at line 413 of file reorderbuffer.c.

References ReorderBufferTupleBuf::alloc_tuple_size, MemoryContextAlloc(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, HeapTupleData::t_data, ReorderBuffer::tup_context, and ReorderBufferTupleBuf::tuple.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

414 {
415  ReorderBufferTupleBuf *tuple;
416  Size alloc_len;
417 
418  alloc_len = tuple_len + SizeofHeapTupleHeader;
419 
420  tuple = (ReorderBufferTupleBuf *)
421  MemoryContextAlloc(rb->tup_context,
422  sizeof(ReorderBufferTupleBuf) +
423  MAXIMUM_ALIGNOF + alloc_len);
424  tuple->alloc_tuple_size = alloc_len;
425  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
426 
427  return tuple;
428 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:175
HeapTupleHeader t_data
Definition: htup.h:67
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:36
HeapTupleData tuple
Definition: reorderbuffer.h:27
size_t Size
Definition: c.h:404
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:706

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer ,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 1741 of file reorderbuffer.c.

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by DecodeStandbyOp(), and ReorderBufferForget().

1743 {
1744  bool use_subtxn = IsTransactionOrTransactionBlock();
1745  int i;
1746 
1747  if (use_subtxn)
1748  BeginInternalSubTransaction("replay");
1749 
1750  /*
1751  * Force invalidations to happen outside of a valid transaction - that way
1752  * entries will just be marked as invalid without accessing the catalog.
1753  * That's advantageous because we don't need to setup the full state
1754  * necessary for catalog access.
1755  */
1756  if (use_subtxn)
1758 
1759  for (i = 0; i < ninvalidations; i++)
1760  LocalExecuteInvalidationMessage(&invalidations[i]);
1761 
1762  if (use_subtxn)
1764 }
void AbortCurrentTransaction(void)
Definition: xact.c:2984
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4465
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4276
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4171
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:554
int i

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer ,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1777 of file reorderbuffer.c.

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by DecodeHeap2Op(), DecodeHeapOp(), DecodeLogicalMsgOp(), DecodeStandbyOp(), DecodeXactOp(), DecodeXLogOp(), and LogicalDecodingProcessRecord().

1778 {
1779  /* many records won't have an xid assigned, centralize check here */
1780  if (xid != InvalidTransactionId)
1781  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1782 }
#define InvalidTransactionId
Definition: transam.h:31
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
ReorderBufferChange  
)

Definition at line 532 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBufferCheckSerializeTXN(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

534 {
535  ReorderBufferTXN *txn;
536 
537  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
538 
539  change->lsn = lsn;
540  Assert(InvalidXLogRecPtr != lsn);
541  dlist_push_tail(&txn->changes, &change->node);
542  txn->nentries++;
543  txn->nentries_mem++;
544 
546 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
dlist_head changes
#define Assert(condition)
Definition: c.h:670
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer ,
TransactionId  ,
Snapshot  snapshot,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 552 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), TeardownHistoricSnapshot(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeLogicalMsgOp().

556 {
557  if (transactional)
558  {
559  MemoryContext oldcontext;
560  ReorderBufferChange *change;
561 
563 
564  oldcontext = MemoryContextSwitchTo(rb->context);
565 
566  change = ReorderBufferGetChange(rb);
568  change->data.msg.prefix = pstrdup(prefix);
569  change->data.msg.message_size = message_size;
570  change->data.msg.message = palloc(message_size);
571  memcpy(change->data.msg.message, message, message_size);
572 
573  ReorderBufferQueueChange(rb, xid, lsn, change);
574 
575  MemoryContextSwitchTo(oldcontext);
576  }
577  else
578  {
579  ReorderBufferTXN *txn = NULL;
580  volatile Snapshot snapshot_now = snapshot;
581 
582  if (xid != InvalidTransactionId)
583  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
584 
585  /* setup snapshot to allow catalog access */
586  SetupHistoricSnapshot(snapshot_now, NULL);
587  PG_TRY();
588  {
589  rb->message(rb, txn, lsn, false, prefix, message_size, message);
590 
592  }
593  PG_CATCH();
594  {
596  PG_RE_THROW();
597  }
598  PG_END_TRY();
599  }
600 }
char * pstrdup(const char *in)
Definition: mcxt.c:1076
union ReorderBufferChange::@93 data
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2019
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
#define InvalidTransactionId
Definition: transam.h:31
#define PG_CATCH()
Definition: elog.h:293
#define Assert(condition)
Definition: c.h:670
#define PG_RE_THROW()
Definition: elog.h:314
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:848
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2003
struct ReorderBufferChange::@93::@95 msg
#define PG_TRY()
Definition: elog.h:284
#define PG_END_TRY()
Definition: elog.h:300

◆ ReorderBufferReturnChange()

void ReorderBufferReturnChange ( ReorderBuffer ,
ReorderBufferChange  
)

Definition at line 362 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::msg, pfree(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferFreeSnap(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, and ReorderBufferChange::tp.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferCommit(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), and ReorderBufferToastReset().

363 {
364  /* free contained data */
365  switch (change->action)
366  {
371  if (change->data.tp.newtuple)
372  {
373  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
374  change->data.tp.newtuple = NULL;
375  }
376 
377  if (change->data.tp.oldtuple)
378  {
379  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
380  change->data.tp.oldtuple = NULL;
381  }
382  break;
384  if (change->data.msg.prefix != NULL)
385  pfree(change->data.msg.prefix);
386  change->data.msg.prefix = NULL;
387  if (change->data.msg.message != NULL)
388  pfree(change->data.msg.message);
389  change->data.msg.message = NULL;
390  break;
392  if (change->data.snapshot)
393  {
394  ReorderBufferFreeSnap(rb, change->data.snapshot);
395  change->data.snapshot = NULL;
396  }
397  break;
398  /* no data in addition to the struct itself */
402  break;
403  }
404 
405  pfree(change);
406 }
void pfree(void *pointer)
Definition: mcxt.c:949
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)

◆ ReorderBufferReturnTupleBuf()

void ReorderBufferReturnTupleBuf ( ReorderBuffer ,
ReorderBufferTupleBuf tuple 
)

Definition at line 437 of file reorderbuffer.c.

References pfree().

Referenced by ReorderBufferReturnChange().

438 {
439  pfree(tuple);
440 }
void pfree(void *pointer)
Definition: mcxt.c:949

◆ ReorderBufferSetBaseSnapshot()

void ReorderBufferSetBaseSnapshot ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
struct SnapshotData snap 
)

◆ ReorderBufferSetRestartPoint()

void ReorderBufferSetRestartPoint ( ReorderBuffer ,
XLogRecPtr  ptr 
)

Definition at line 647 of file reorderbuffer.c.

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

648 {
649  rb->current_restart_decoding_lsn = ptr;
650 }

◆ ReorderBufferXidHasBaseSnapshot()

bool ReorderBufferXidHasBaseSnapshot ( ReorderBuffer ,
TransactionId  xid 
)

Definition at line 1943 of file reorderbuffer.c.

References ReorderBufferTXN::base_snapshot, InvalidXLogRecPtr, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildCommitTxn(), SnapBuildDistributeNewCatalogSnapshot(), and SnapBuildProcessChange().

1944 {
1945  ReorderBufferTXN *txn;
1946 
1947  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1948  false);
1949 
1950  /* transaction isn't known yet, ergo no snapshot */
1951  if (txn == NULL)
1952  return false;
1953 
1954  /*
1955  * TODO: It would be a nice improvement if we would check the toplevel
1956  * transaction in subtransactions, but we'd need to keep track of a bit
1957  * more state.
1958  */
1959  return txn->base_snapshot != NULL;
1960 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferXidHasCatalogChanges()

bool ReorderBufferXidHasCatalogChanges ( ReorderBuffer ,
TransactionId  xid 
)

Definition at line 1927 of file reorderbuffer.c.

References ReorderBufferTXN::has_catalog_changes, InvalidXLogRecPtr, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildCommitTxn().

1928 {
1929  ReorderBufferTXN *txn;
1930 
1931  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1932  false);
1933  if (txn == NULL)
1934  return false;
1935 
1936  return txn->has_catalog_changes;
1937 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferXidSetCatalogChanges()

void ReorderBufferXidSetCatalogChanges ( ReorderBuffer ,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1912 of file reorderbuffer.c.

References ReorderBufferTXN::has_catalog_changes, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit(), DecodeHeapOp(), and SnapBuildProcessNewCid().

1914 {
1915  ReorderBufferTXN *txn;
1916 
1917  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1918 
1919  txn->has_catalog_changes = true;
1920 }
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ StartupReorderBuffer()

void StartupReorderBuffer ( void  )

Definition at line 2558 of file reorderbuffer.c.

References AllocateDir(), dirent::d_name, DEBUG2, ereport, errcode_for_file_access(), errmsg(), FreeDir(), lstat, MAXPGPATH, PANIC, ReadDir(), ReplicationSlotValidateName(), S_ISDIR, and stat.

Referenced by StartupXLOG().

2559 {
2560  DIR *logical_dir;
2561  struct dirent *logical_de;
2562 
2563  DIR *spill_dir;
2564  struct dirent *spill_de;
2565 
2566  logical_dir = AllocateDir("pg_replslot");
2567  while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
2568  {
2569  struct stat statbuf;
2570  char path[MAXPGPATH * 2 + 12];
2571 
2572  if (strcmp(logical_de->d_name, ".") == 0 ||
2573  strcmp(logical_de->d_name, "..") == 0)
2574  continue;
2575 
2576  /* if it cannot be a slot, skip the directory */
2577  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
2578  continue;
2579 
2580  /*
2581  * ok, has to be a surviving logical slot, iterate and delete
2582  * everything starting with xid-*
2583  */
2584  sprintf(path, "pg_replslot/%s", logical_de->d_name);
2585 
2586  /* we're only creating directories here, skip if it's not our's */
2587  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2588  continue;
2589 
2590  spill_dir = AllocateDir(path);
2591  while ((spill_de = ReadDir(spill_dir, path)) != NULL)
2592  {
2593  if (strcmp(spill_de->d_name, ".") == 0 ||
2594  strcmp(spill_de->d_name, "..") == 0)
2595  continue;
2596 
2597  /* only look at names that can be ours */
2598  if (strncmp(spill_de->d_name, "xid", 3) == 0)
2599  {
2600  sprintf(path, "pg_replslot/%s/%s", logical_de->d_name,
2601  spill_de->d_name);
2602 
2603  if (unlink(path) != 0)
2604  ereport(PANIC,
2606  errmsg("could not remove file \"%s\": %m",
2607  path)));
2608  }
2609  }
2610  FreeDir(spill_dir);
2611  }
2612  FreeDir(logical_dir);
2613 }
Definition: dirent.h:9
#define PANIC
Definition: elog.h:53
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:174
Definition: dirent.c:25
#define MAXPGPATH
#define DEBUG2
Definition: elog.h:24
int errcode_for_file_access(void)
Definition: elog.c:598
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2373
#define ereport(elevel, rest)
Definition: elog.h:122
#define stat(a, b)
Definition: win32_port.h:266
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2439
#define S_ISDIR(m)
Definition: win32_port.h:307
#define lstat(path, sb)
Definition: win32_port.h:255
int errmsg(const char *fmt,...)
Definition: elog.c:797
char d_name[MAX_PATH]
Definition: dirent.h:14
int FreeDir(DIR *dir)
Definition: fd.c:2482