PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
reorderbuffer.h File Reference
#include "access/htup_details.h"
#include "lib/ilist.h"
#include "storage/sinval.h"
#include "utils/hsearch.h"
#include "utils/relcache.h"
#include "utils/snapshot.h"
#include "utils/timestamp.h"
Include dependency graph for reorderbuffer.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTupleBuf
 
struct  ReorderBufferChange
 
struct  ReorderBufferTXN
 
struct  ReorderBuffer
 

Macros

#define ReorderBufferTupleBufData(p)   ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))
 

Typedefs

typedef struct
ReorderBufferTupleBuf 
ReorderBufferTupleBuf
 
typedef struct ReorderBufferChange ReorderBufferChange
 
typedef struct ReorderBufferTXN ReorderBufferTXN
 
typedef struct ReorderBuffer ReorderBuffer
 
typedef void(* ReorderBufferApplyChangeCB )(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
typedef void(* ReorderBufferBeginCB )(ReorderBuffer *rb, ReorderBufferTXN *txn)
 
typedef void(* ReorderBufferCommitCB )(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
typedef void(* ReorderBufferMessageCB )(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 

Enumerations

enum  ReorderBufferChangeType {
  REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_MESSAGE,
  REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
  REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
}
 

Functions

ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *)
 
ReorderBufferTupleBufReorderBufferGetTupleBuf (ReorderBuffer *, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (ReorderBuffer *, ReorderBufferTupleBuf *tuple)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *)
 
void ReorderBufferReturnChange (ReorderBuffer *, ReorderBufferChange *)
 
void ReorderBufferQueueChange (ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *)
 
void ReorderBufferQueueMessage (ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
void ReorderBufferCommit (ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferAssignChild (ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
void ReorderBufferAbort (ReorderBuffer *, TransactionId, XLogRecPtr lsn)
 
void ReorderBufferAbortOld (ReorderBuffer *, TransactionId xid)
 
void ReorderBufferForget (ReorderBuffer *, TransactionId, XLogRecPtr lsn)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap)
 
void ReorderBufferAddSnapshot (ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *, TransactionId, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *, TransactionId, XLogRecPtr lsn, RelFileNode node, ItemPointerData pt, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *, TransactionId, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *, TransactionId xid, XLogRecPtr lsn)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *, TransactionId xid)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *, XLogRecPtr ptr)
 
void StartupReorderBuffer (void)
 

Macro Definition Documentation

#define ReorderBufferTupleBufData (   p)    ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))

Typedef Documentation

Definition at line 275 of file reorderbuffer.h.

typedef void(* ReorderBufferApplyChangeCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)

Definition at line 278 of file reorderbuffer.h.

typedef void(* ReorderBufferBeginCB)(ReorderBuffer *rb, ReorderBufferTXN *txn)

Definition at line 285 of file reorderbuffer.h.

typedef void(* ReorderBufferCommitCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

Definition at line 290 of file reorderbuffer.h.

typedef void(* ReorderBufferMessageCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)

Definition at line 296 of file reorderbuffer.h.

Enumeration Type Documentation

Enumerator
REORDER_BUFFER_CHANGE_INSERT 
REORDER_BUFFER_CHANGE_UPDATE 
REORDER_BUFFER_CHANGE_DELETE 
REORDER_BUFFER_CHANGE_MESSAGE 
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT 
REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID 
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM 

Definition at line 52 of file reorderbuffer.h.

Function Documentation

void ReorderBufferAbort ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn 
)

Definition at line 1682 of file reorderbuffer.c.

References ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferCleanupTXN(), and ReorderBufferTXNByXid().

Referenced by DecodeAbort().

1683 {
1684  ReorderBufferTXN *txn;
1685 
1686  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1687  false);
1688 
1689  /* unknown, nothing to remove */
1690  if (txn == NULL)
1691  return;
1692 
1693  /* cosmetic... */
1694  txn->final_lsn = lsn;
1695 
1696  /* remove potential on-disk data, and deallocate */
1697  ReorderBufferCleanupTXN(rb, txn);
1698 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferAbortOld ( ReorderBuffer ,
TransactionId  xid 
)

Definition at line 1708 of file reorderbuffer.c.

References dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, elog, ReorderBufferCleanupTXN(), ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), and ReorderBufferTXN::xid.

Referenced by DecodeStandbyOp().

1709 {
1710  dlist_mutable_iter it;
1711 
1712  /*
1713  * Iterate through all (potential) toplevel TXNs and abort all that are
1714  * older than what possibly can be running. Once we've found the first
1715  * that is alive we stop, there might be some that acquired an xid earlier
1716  * but started writing later, but it's unlikely and they will cleaned up
1717  * in a later call to ReorderBufferAbortOld().
1718  */
1719  dlist_foreach_modify(it, &rb->toplevel_by_lsn)
1720  {
1721  ReorderBufferTXN *txn;
1722 
1723  txn = dlist_container(ReorderBufferTXN, node, it.cur);
1724 
1725  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1726  {
1727  elog(DEBUG2, "aborting old transaction %u", txn->xid);
1728 
1729  /* remove potential on-disk data, and deallocate this tx */
1730  ReorderBufferCleanupTXN(rb, txn);
1731  }
1732  else
1733  return;
1734  }
1735 }
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define DEBUG2
Definition: elog.h:24
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
#define elog
Definition: elog.h:219
void ReorderBufferAddInvalidations ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 1920 of file reorderbuffer.c.

References Assert, ReorderBuffer::context, elog, ERROR, ReorderBufferTXN::invalidations, MemoryContextAlloc(), ReorderBufferTXN::ninvalidations, and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

1923 {
1924  ReorderBufferTXN *txn;
1925 
1926  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1927 
1928  if (txn->ninvalidations != 0)
1929  elog(ERROR, "only ever add one set of invalidations");
1930 
1931  Assert(nmsgs > 0);
1932 
1933  txn->ninvalidations = nmsgs;
1935  MemoryContextAlloc(rb->context,
1936  sizeof(SharedInvalidationMessage) * nmsgs);
1937  memcpy(txn->invalidations, msgs,
1938  sizeof(SharedInvalidationMessage) * nmsgs);
1939 }
#define ERROR
Definition: elog.h:43
#define Assert(condition)
Definition: c.h:664
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:706
#define elog
Definition: elog.h:219
void ReorderBufferAddNewCommandId ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
CommandId  cid 
)

Definition at line 1876 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::command_id, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, ReorderBufferGetChange(), and ReorderBufferQueueChange().

Referenced by SnapBuildProcessNewCid().

1878 {
1880 
1881  change->data.command_id = cid;
1883 
1884  ReorderBufferQueueChange(rb, xid, lsn, change);
1885 }
union ReorderBufferChange::@93 data
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
void ReorderBufferAddNewTupleCids ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
RelFileNode  node,
ItemPointerData  pt,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 1892 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, and ReorderBufferTXN::tuplecids.

Referenced by SnapBuildProcessNewCid().

1896 {
1898  ReorderBufferTXN *txn;
1899 
1900  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1901 
1902  change->data.tuplecid.node = node;
1903  change->data.tuplecid.tid = tid;
1904  change->data.tuplecid.cmin = cmin;
1905  change->data.tuplecid.cmax = cmax;
1906  change->data.tuplecid.combocid = combocid;
1907  change->lsn = lsn;
1909 
1910  dlist_push_tail(&txn->tuplecids, &change->node);
1911  txn->ntuplecids++;
1912 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
union ReorderBufferChange::@93 data
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
struct ReorderBufferChange::@93::@96 tuplecid
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
dlist_head tuplecids
void ReorderBufferAddSnapshot ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
struct SnapshotData snap 
)
ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 223 of file reorderbuffer.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate(), buffer, ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::cached_tuplebufs, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, dlist_init(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), ReorderBuffer::nr_cached_tuplebufs, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, SLAB_DEFAULT_BLOCK_SIZE, SlabContextCreate(), slist_init(), ReorderBuffer::toplevel_by_lsn, and ReorderBuffer::txn_context.

Referenced by StartupDecodingContext().

224 {
226  HASHCTL hash_ctl;
227  MemoryContext new_ctx;
228 
229  /* allocate memory in own context, to have better accountability */
231  "ReorderBuffer",
233 
234  buffer =
235  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
236 
237  memset(&hash_ctl, 0, sizeof(hash_ctl));
238 
239  buffer->context = new_ctx;
240 
241  buffer->change_context = SlabContextCreate(new_ctx,
242  "Change",
244  sizeof(ReorderBufferChange));
245 
246  buffer->txn_context = SlabContextCreate(new_ctx,
247  "TXN",
249  sizeof(ReorderBufferTXN));
250 
251  hash_ctl.keysize = sizeof(TransactionId);
252  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
253  hash_ctl.hcxt = buffer->context;
254 
255  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
257 
259  buffer->by_txn_last_txn = NULL;
260 
261  buffer->nr_cached_tuplebufs = 0;
262 
263  buffer->outbuf = NULL;
264  buffer->outbufsize = 0;
265 
267 
268  dlist_init(&buffer->toplevel_by_lsn);
269  slist_init(&buffer->cached_tuplebufs);
270 
271  return buffer;
272 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
uint32 TransactionId
Definition: c.h:391
MemoryContext hcxt
Definition: hsearch.h:78
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
Size entrysize
Definition: hsearch.h:73
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:183
MemoryContext change_context
static void slist_init(slist_head *head)
Definition: ilist.h:554
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
#define InvalidTransactionId
Definition: transam.h:31
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext context
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
ReorderBufferTXN * by_txn_last_txn
dlist_head toplevel_by_lsn
Size keysize
Definition: hsearch.h:72
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:214
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:194
Size nr_cached_tuplebufs
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:706
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
MemoryContext txn_context
slist_head cached_tuplebufs
void ReorderBufferAssignChild ( ReorderBuffer ,
TransactionId  ,
TransactionId  ,
XLogRecPtr  commit_lsn 
)

Definition at line 699 of file reorderbuffer.c.

References Assert, dlist_delete(), dlist_push_tail(), elog, ERROR, ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, ReorderBufferTXNByXid(), and ReorderBufferTXN::subtxns.

Referenced by DecodeXactOp().

701 {
702  ReorderBufferTXN *txn;
703  ReorderBufferTXN *subtxn;
704  bool new_top;
705  bool new_sub;
706 
707  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
708  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
709 
710  if (new_sub)
711  {
712  /*
713  * we assign subtransactions to top level transaction even if we don't
714  * have data for it yet, assignment records frequently reference xids
715  * that have not yet produced any records. Knowing those aren't top
716  * level xids allows us to make processing cheaper in some places.
717  */
718  dlist_push_tail(&txn->subtxns, &subtxn->node);
719  txn->nsubtxns++;
720  }
721  else if (!subtxn->is_known_as_subxact)
722  {
723  subtxn->is_known_as_subxact = true;
724  Assert(subtxn->nsubtxns == 0);
725 
726  /* remove from lsn order list of top-level transactions */
727  dlist_delete(&subtxn->node);
728 
729  /* add to toplevel transaction */
730  dlist_push_tail(&txn->subtxns, &subtxn->node);
731  txn->nsubtxns++;
732  }
733  else if (new_top)
734  {
735  elog(ERROR, "existing subxact assigned to unknown toplevel xact");
736  }
737 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
#define ERROR
Definition: elog.h:43
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define Assert(condition)
Definition: c.h:664
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define elog
Definition: elog.h:219
void ReorderBufferCommit ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 1311 of file reorderbuffer.c.

References AbortCurrentTransaction(), ReorderBufferChange::action, ReorderBuffer::apply_change, Assert, ReorderBufferTXN::base_snapshot, ReorderBuffer::begin, BeginInternalSubTransaction(), ReorderBufferChange::command_id, ReorderBuffer::commit, ReorderBufferTXN::commit_time, SnapshotData::copied, SnapshotData::curcid, ReorderBufferChange::data, dlist_delete(), elog, ReorderBufferTXN::end_lsn, ERROR, ReorderBufferTXN::final_lsn, FirstCommandId, GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, ReorderBuffer::message, ReorderBufferChange::msg, ReorderBufferTXN::ninvalidations, ReorderBufferChange::node, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelidByRelfilenode(), RELKIND_SEQUENCE, relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferReturnChange(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTXNByXid(), RollbackAndReleaseCurrentSubTransaction(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, StartTransactionCommand(), TeardownHistoricSnapshot(), ReorderBufferChange::tp, and ReorderBufferTXN::tuplecid_hash.

Referenced by DecodeCommit().

1315 {
1316  ReorderBufferTXN *txn;
1317  volatile Snapshot snapshot_now;
1318  volatile CommandId command_id = FirstCommandId;
1319  bool using_subtxn;
1320  ReorderBufferIterTXNState *volatile iterstate = NULL;
1321 
1322  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1323  false);
1324 
1325  /* unknown transaction, nothing to replay */
1326  if (txn == NULL)
1327  return;
1328 
1329  txn->final_lsn = commit_lsn;
1330  txn->end_lsn = end_lsn;
1331  txn->commit_time = commit_time;
1332  txn->origin_id = origin_id;
1333  txn->origin_lsn = origin_lsn;
1334 
1335  /*
1336  * If this transaction didn't have any real changes in our database, it's
1337  * OK not to have a snapshot. Note that ReorderBufferCommitChild will have
1338  * transferred its snapshot to this transaction if it had one and the
1339  * toplevel tx didn't.
1340  */
1341  if (txn->base_snapshot == NULL)
1342  {
1343  Assert(txn->ninvalidations == 0);
1344  ReorderBufferCleanupTXN(rb, txn);
1345  return;
1346  }
1347 
1348  snapshot_now = txn->base_snapshot;
1349 
1350  /* build data to be able to lookup the CommandIds of catalog tuples */
1352 
1353  /* setup the initial snapshot */
1354  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1355 
1356  /*
1357  * Decoding needs access to syscaches et al., which in turn use
1358  * heavyweight locks and such. Thus we need to have enough state around to
1359  * keep track of those. The easiest way is to simply use a transaction
1360  * internally. That also allows us to easily enforce that nothing writes
1361  * to the database by checking for xid assignments.
1362  *
1363  * When we're called via the SQL SRF there's already a transaction
1364  * started, so start an explicit subtransaction there.
1365  */
1366  using_subtxn = IsTransactionOrTransactionBlock();
1367 
1368  PG_TRY();
1369  {
1370  ReorderBufferChange *change;
1371  ReorderBufferChange *specinsert = NULL;
1372 
1373  if (using_subtxn)
1374  BeginInternalSubTransaction("replay");
1375  else
1377 
1378  rb->begin(rb, txn);
1379 
1380  iterstate = ReorderBufferIterTXNInit(rb, txn);
1381  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1382  {
1383  Relation relation = NULL;
1384  Oid reloid;
1385 
1386  switch (change->action)
1387  {
1389 
1390  /*
1391  * Confirmation for speculative insertion arrived. Simply
1392  * use as a normal record. It'll be cleaned up at the end
1393  * of INSERT processing.
1394  */
1395  Assert(specinsert->data.tp.oldtuple == NULL);
1396  change = specinsert;
1398 
1399  /* intentionally fall through */
1403  Assert(snapshot_now);
1404 
1405  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1406  change->data.tp.relnode.relNode);
1407 
1408  /*
1409  * Catalog tuple without data, emitted while catalog was
1410  * in the process of being rewritten.
1411  */
1412  if (reloid == InvalidOid &&
1413  change->data.tp.newtuple == NULL &&
1414  change->data.tp.oldtuple == NULL)
1415  goto change_done;
1416  else if (reloid == InvalidOid)
1417  elog(ERROR, "could not map filenode \"%s\" to relation OID",
1418  relpathperm(change->data.tp.relnode,
1419  MAIN_FORKNUM));
1420 
1421  relation = RelationIdGetRelation(reloid);
1422 
1423  if (relation == NULL)
1424  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1425  reloid,
1426  relpathperm(change->data.tp.relnode,
1427  MAIN_FORKNUM));
1428 
1429  if (!RelationIsLogicallyLogged(relation))
1430  goto change_done;
1431 
1432  /*
1433  * For now ignore sequence changes entirely. Most of the
1434  * time they don't log changes using records we
1435  * understand, so it doesn't make sense to handle the few
1436  * cases we do.
1437  */
1438  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1439  goto change_done;
1440 
1441  /* user-triggered change */
1442  if (!IsToastRelation(relation))
1443  {
1444  ReorderBufferToastReplace(rb, txn, relation, change);
1445  rb->apply_change(rb, txn, relation, change);
1446 
1447  /*
1448  * Only clear reassembled toast chunks if we're sure
1449  * they're not required anymore. The creator of the
1450  * tuple tells us.
1451  */
1452  if (change->data.tp.clear_toast_afterwards)
1453  ReorderBufferToastReset(rb, txn);
1454  }
1455  /* we're not interested in toast deletions */
1456  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1457  {
1458  /*
1459  * Need to reassemble the full toasted Datum in
1460  * memory, to ensure the chunks don't get reused till
1461  * we're done remove it from the list of this
1462  * transaction's changes. Otherwise it will get
1463  * freed/reused while restoring spooled data from
1464  * disk.
1465  */
1466  dlist_delete(&change->node);
1467  ReorderBufferToastAppendChunk(rb, txn, relation,
1468  change);
1469  }
1470 
1471  change_done:
1472 
1473  /*
1474  * Either speculative insertion was confirmed, or it was
1475  * unsuccessful and the record isn't needed anymore.
1476  */
1477  if (specinsert != NULL)
1478  {
1479  ReorderBufferReturnChange(rb, specinsert);
1480  specinsert = NULL;
1481  }
1482 
1483  if (relation != NULL)
1484  {
1485  RelationClose(relation);
1486  relation = NULL;
1487  }
1488  break;
1489 
1491 
1492  /*
1493  * Speculative insertions are dealt with by delaying the
1494  * processing of the insert until the confirmation record
1495  * arrives. For that we simply unlink the record from the
1496  * chain, so it does not get freed/reused while restoring
1497  * spooled data from disk.
1498  *
1499  * This is safe in the face of concurrent catalog changes
1500  * because the relevant relation can't be changed between
1501  * speculative insertion and confirmation due to
1502  * CheckTableNotInUse() and locking.
1503  */
1504 
1505  /* clear out a pending (and thus failed) speculation */
1506  if (specinsert != NULL)
1507  {
1508  ReorderBufferReturnChange(rb, specinsert);
1509  specinsert = NULL;
1510  }
1511 
1512  /* and memorize the pending insertion */
1513  dlist_delete(&change->node);
1514  specinsert = change;
1515  break;
1516 
1518  rb->message(rb, txn, change->lsn, true,
1519  change->data.msg.prefix,
1520  change->data.msg.message_size,
1521  change->data.msg.message);
1522  break;
1523 
1525  /* get rid of the old */
1526  TeardownHistoricSnapshot(false);
1527 
1528  if (snapshot_now->copied)
1529  {
1530  ReorderBufferFreeSnap(rb, snapshot_now);
1531  snapshot_now =
1532  ReorderBufferCopySnap(rb, change->data.snapshot,
1533  txn, command_id);
1534  }
1535 
1536  /*
1537  * Restored from disk, need to be careful not to double
1538  * free. We could introduce refcounting for that, but for
1539  * now this seems infrequent enough not to care.
1540  */
1541  else if (change->data.snapshot->copied)
1542  {
1543  snapshot_now =
1544  ReorderBufferCopySnap(rb, change->data.snapshot,
1545  txn, command_id);
1546  }
1547  else
1548  {
1549  snapshot_now = change->data.snapshot;
1550  }
1551 
1552 
1553  /* and continue with the new one */
1554  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1555  break;
1556 
1558  Assert(change->data.command_id != InvalidCommandId);
1559 
1560  if (command_id < change->data.command_id)
1561  {
1562  command_id = change->data.command_id;
1563 
1564  if (!snapshot_now->copied)
1565  {
1566  /* we don't use the global one anymore */
1567  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1568  txn, command_id);
1569  }
1570 
1571  snapshot_now->curcid = command_id;
1572 
1573  TeardownHistoricSnapshot(false);
1574  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1575 
1576  /*
1577  * Every time the CommandId is incremented, we could
1578  * see new catalog contents, so execute all
1579  * invalidations.
1580  */
1582  }
1583 
1584  break;
1585 
1587  elog(ERROR, "tuplecid value in changequeue");
1588  break;
1589  }
1590  }
1591 
1592  /*
1593  * There's a speculative insertion remaining, just clean in up, it
1594  * can't have been successful, otherwise we'd gotten a confirmation
1595  * record.
1596  */
1597  if (specinsert)
1598  {
1599  ReorderBufferReturnChange(rb, specinsert);
1600  specinsert = NULL;
1601  }
1602 
1603  /* clean up the iterator */
1604  ReorderBufferIterTXNFinish(rb, iterstate);
1605  iterstate = NULL;
1606 
1607  /* call commit callback */
1608  rb->commit(rb, txn, commit_lsn);
1609 
1610  /* this is just a sanity check against bad output plugin behaviour */
1612  elog(ERROR, "output plugin used XID %u",
1614 
1615  /* cleanup */
1616  TeardownHistoricSnapshot(false);
1617 
1618  /*
1619  * Aborting the current (sub-)transaction as a whole has the right
1620  * semantics. We want all locks acquired in here to be released, not
1621  * reassigned to the parent and we do not want any database access
1622  * have persistent effects.
1623  */
1625 
1626  /* make sure there's no cache pollution */
1628 
1629  if (using_subtxn)
1631 
1632  if (snapshot_now->copied)
1633  ReorderBufferFreeSnap(rb, snapshot_now);
1634 
1635  /* remove potential on-disk data, and deallocate */
1636  ReorderBufferCleanupTXN(rb, txn);
1637  }
1638  PG_CATCH();
1639  {
1640  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1641  if (iterstate)
1642  ReorderBufferIterTXNFinish(rb, iterstate);
1643 
1645 
1646  /*
1647  * Force cache invalidation to happen outside of a valid transaction
1648  * to prevent catalog access as we just caught an error.
1649  */
1651 
1652  /* make sure there's no cache pollution */
1654 
1655  if (using_subtxn)
1657 
1658  if (snapshot_now->copied)
1659  ReorderBufferFreeSnap(rb, snapshot_now);
1660 
1661  /* remove potential on-disk data, and deallocate */
1662  ReorderBufferCleanupTXN(rb, txn);
1663 
1664  PG_RE_THROW();
1665  }
1666  PG_END_TRY();
1667 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
uint32 CommandId
Definition: c.h:405
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
TimestampTz commit_time
void AbortCurrentTransaction(void)
Definition: xact.c:2992
bool IsToastRelation(Relation relation)
Definition: catalog.c:136
#define relpathperm(rnode, forknum)
Definition: relpath.h:67
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
bool copied
Definition: snapshot.h:96
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
union ReorderBufferChange::@93 data
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4473
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2011
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:575
Form_pg_class rd_rel
Definition: rel.h:114
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
XLogRecPtr origin_lsn
#define FirstCommandId
Definition: c.h:407
#define ERROR
Definition: elog.h:43
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:418
struct ReorderBufferChange::@93::@94 tp
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4284
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:435
#define InvalidTransactionId
Definition: transam.h:31
XLogRecPtr final_lsn
void RelationClose(Relation relation)
Definition: relcache.c:2164
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
#define InvalidCommandId
Definition: c.h:408
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
#define InvalidOid
Definition: postgres_ext.h:36
CommandId curcid
Definition: snapshot.h:98
static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define PG_CATCH()
Definition: elog.h:293
#define Assert(condition)
Definition: c.h:664
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
XLogRecPtr end_lsn
void StartTransactionCommand(void)
Definition: xact.c:2681
void BeginInternalSubTransaction(char *name)
Definition: xact.c:4179
#define PG_RE_THROW()
Definition: elog.h:314
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:1995
struct ReorderBufferChange::@93::@95 msg
#define elog
Definition: elog.h:219
#define PG_TRY()
Definition: elog.h:284
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
#define RELKIND_SEQUENCE
Definition: pg_class.h:162
static ReorderBufferIterTXNState * ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2075
#define PG_END_TRY()
Definition: elog.h:300
void ReorderBufferCommitChild ( ReorderBuffer ,
TransactionId  ,
TransactionId  ,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 744 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_delete(), dlist_push_tail(), elog, ReorderBufferTXN::end_lsn, ERROR, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, ReorderBufferTXNByXid(), and ReorderBufferTXN::subtxns.

Referenced by DecodeCommit().

747 {
748  ReorderBufferTXN *txn;
749  ReorderBufferTXN *subtxn;
750 
751  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
752  InvalidXLogRecPtr, false);
753 
754  /*
755  * No need to do anything if that subtxn didn't contain any changes
756  */
757  if (!subtxn)
758  return;
759 
760  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true);
761 
762  if (txn == NULL)
763  elog(ERROR, "subxact logged without previous toplevel record");
764 
765  /*
766  * Pass our base snapshot to the parent transaction if it doesn't have
767  * one, or ours is older. That can happen if there are no changes in the
768  * toplevel transaction but in one of the child transactions. This allows
769  * the parent to simply use its base snapshot initially.
770  */
771  if (subtxn->base_snapshot != NULL &&
772  (txn->base_snapshot == NULL ||
773  txn->base_snapshot_lsn > subtxn->base_snapshot_lsn))
774  {
775  txn->base_snapshot = subtxn->base_snapshot;
776  txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
777  subtxn->base_snapshot = NULL;
779  }
780 
781  subtxn->final_lsn = commit_lsn;
782  subtxn->end_lsn = end_lsn;
783 
784  if (!subtxn->is_known_as_subxact)
785  {
786  subtxn->is_known_as_subxact = true;
787  Assert(subtxn->nsubtxns == 0);
788 
789  /* remove from lsn order list of top-level transactions */
790  dlist_delete(&subtxn->node);
791 
792  /* add to subtransaction list */
793  dlist_push_tail(&txn->subtxns, &subtxn->node);
794  txn->nsubtxns++;
795  }
796 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
XLogRecPtr base_snapshot_lsn
#define ERROR
Definition: elog.h:43
XLogRecPtr final_lsn
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define Assert(condition)
Definition: c.h:664
XLogRecPtr end_lsn
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define elog
Definition: elog.h:219
void ReorderBufferForget ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn 
)

Definition at line 1751 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

1752 {
1753  ReorderBufferTXN *txn;
1754 
1755  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1756  false);
1757 
1758  /* unknown, nothing to forget */
1759  if (txn == NULL)
1760  return;
1761 
1762  /* cosmetic... */
1763  txn->final_lsn = lsn;
1764 
1765  /*
1766  * Process cache invalidation messages if there are any. Even if we're not
1767  * interested in the transaction's contents, it could have manipulated the
1768  * catalog and we need to update the caches according to that.
1769  */
1770  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
1772  txn->invalidations);
1773  else
1774  Assert(txn->ninvalidations == 0);
1775 
1776  /* remove potential on-disk data, and deallocate */
1777  ReorderBufferCleanupTXN(rb, txn);
1778 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:664
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferFree ( ReorderBuffer )

Definition at line 278 of file reorderbuffer.c.

References ReorderBuffer::context, and MemoryContextDelete().

Referenced by FreeDecodingContext().

279 {
280  MemoryContext context = rb->context;
281 
282  /*
283  * We free separately allocated data by entirely scrapping reorderbuffer's
284  * memory context.
285  */
286  MemoryContextDelete(context);
287 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
ReorderBufferChange* ReorderBufferGetChange ( ReorderBuffer )

Definition at line 346 of file reorderbuffer.c.

References ReorderBuffer::change_context, and MemoryContextAlloc().

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddNewTupleCids(), ReorderBufferAddSnapshot(), ReorderBufferQueueMessage(), and ReorderBufferRestoreChange().

347 {
348  ReorderBufferChange *change;
349 
350  change = (ReorderBufferChange *)
351  MemoryContextAlloc(rb->change_context, sizeof(ReorderBufferChange));
352 
353  memset(change, 0, sizeof(ReorderBufferChange));
354  return change;
355 }
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:706
ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer )

Definition at line 676 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, and ReorderBuffer::toplevel_by_lsn.

Referenced by SnapBuildProcessRunningXacts().

677 {
678  ReorderBufferTXN *txn;
679 
680  if (dlist_is_empty(&rb->toplevel_by_lsn))
681  return NULL;
682 
683  AssertTXNLsnOrder(rb);
684 
685  txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
686 
689  return txn;
690 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:664
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
ReorderBufferTupleBuf* ReorderBufferGetTupleBuf ( ReorderBuffer ,
Size  tuple_len 
)

Definition at line 415 of file reorderbuffer.c.

References ReorderBufferTupleBuf::alloc_tuple_size, Assert, ReorderBuffer::cached_tuplebufs, ReorderBuffer::context, MaxHeapTupleSize, MemoryContextAlloc(), ReorderBuffer::nr_cached_tuplebufs, ReorderBufferTupleBufData, SizeofHeapTupleHeader, slist_container, slist_pop_head_node(), HeapTupleData::t_data, ReorderBufferTupleBuf::tuple, and VALGRIND_MAKE_MEM_UNDEFINED.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

416 {
417  ReorderBufferTupleBuf *tuple;
418  Size alloc_len;
419 
420  alloc_len = tuple_len + SizeofHeapTupleHeader;
421 
422  /*
423  * Most tuples are below MaxHeapTupleSize, so we use a slab allocator for
424  * those. Thus always allocate at least MaxHeapTupleSize. Note that tuples
425  * generated for oldtuples can be bigger, as they don't have out-of-line
426  * toast columns.
427  */
428  if (alloc_len < MaxHeapTupleSize)
429  alloc_len = MaxHeapTupleSize;
430 
431 
432  /* if small enough, check the slab cache */
433  if (alloc_len <= MaxHeapTupleSize && rb->nr_cached_tuplebufs)
434  {
435  rb->nr_cached_tuplebufs--;
437  slist_pop_head_node(&rb->cached_tuplebufs));
439 #ifdef USE_ASSERT_CHECKING
440  memset(&tuple->tuple, 0xa9, sizeof(HeapTupleData));
442 #endif
443  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
444 #ifdef USE_ASSERT_CHECKING
445  memset(tuple->tuple.t_data, 0xa8, tuple->alloc_tuple_size);
447 #endif
448  }
449  else
450  {
451  tuple = (ReorderBufferTupleBuf *)
452  MemoryContextAlloc(rb->context,
453  sizeof(ReorderBufferTupleBuf) +
454  MAXIMUM_ALIGNOF + alloc_len);
455  tuple->alloc_tuple_size = alloc_len;
456  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
457  }
458 
459  return tuple;
460 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:170
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
Definition: memdebug.h:28
HeapTupleHeader t_data
Definition: htup.h:67
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:36
#define MaxHeapTupleSize
Definition: htup_details.h:561
HeapTupleData tuple
Definition: reorderbuffer.h:27
static slist_node * slist_pop_head_node(slist_head *head)
Definition: ilist.h:596
#define slist_container(type, membername, ptr)
Definition: ilist.h:674
#define Assert(condition)
Definition: c.h:664
size_t Size
Definition: c.h:350
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:706
void ReorderBufferImmediateInvalidation ( ReorderBuffer ,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 1787 of file reorderbuffer.c.

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by DecodeStandbyOp(), and ReorderBufferForget().

1789 {
1790  bool use_subtxn = IsTransactionOrTransactionBlock();
1791  int i;
1792 
1793  if (use_subtxn)
1794  BeginInternalSubTransaction("replay");
1795 
1796  /*
1797  * Force invalidations to happen outside of a valid transaction - that way
1798  * entries will just be marked as invalid without accessing the catalog.
1799  * That's advantageous because we don't need to setup the full state
1800  * necessary for catalog access.
1801  */
1802  if (use_subtxn)
1804 
1805  for (i = 0; i < ninvalidations; i++)
1806  LocalExecuteInvalidationMessage(&invalidations[i]);
1807 
1808  if (use_subtxn)
1810 }
void AbortCurrentTransaction(void)
Definition: xact.c:2992
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4473
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4284
void BeginInternalSubTransaction(char *name)
Definition: xact.c:4179
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:554
int i
void ReorderBufferProcessXid ( ReorderBuffer ,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1823 of file reorderbuffer.c.

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by DecodeHeap2Op(), DecodeHeapOp(), DecodeLogicalMsgOp(), DecodeStandbyOp(), DecodeXactOp(), DecodeXLogOp(), and LogicalDecodingProcessRecord().

1824 {
1825  /* many records won't have an xid assigned, centralize check here */
1826  if (xid != InvalidTransactionId)
1827  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1828 }
#define InvalidTransactionId
Definition: transam.h:31
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferQueueChange ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
ReorderBufferChange  
)

Definition at line 578 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBufferCheckSerializeTXN(), and ReorderBufferTXNByXid().

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

580 {
581  ReorderBufferTXN *txn;
582 
583  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
584 
585  change->lsn = lsn;
586  Assert(InvalidXLogRecPtr != lsn);
587  dlist_push_tail(&txn->changes, &change->node);
588  txn->nentries++;
589  txn->nentries_mem++;
590 
592 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
dlist_head changes
#define Assert(condition)
Definition: c.h:664
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferQueueMessage ( ReorderBuffer ,
TransactionId  ,
Snapshot  snapshot,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 598 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), and TeardownHistoricSnapshot().

Referenced by DecodeLogicalMsgOp().

602 {
603  if (transactional)
604  {
605  MemoryContext oldcontext;
606  ReorderBufferChange *change;
607 
609 
610  oldcontext = MemoryContextSwitchTo(rb->context);
611 
612  change = ReorderBufferGetChange(rb);
614  change->data.msg.prefix = pstrdup(prefix);
615  change->data.msg.message_size = message_size;
616  change->data.msg.message = palloc(message_size);
617  memcpy(change->data.msg.message, message, message_size);
618 
619  ReorderBufferQueueChange(rb, xid, lsn, change);
620 
621  MemoryContextSwitchTo(oldcontext);
622  }
623  else
624  {
625  ReorderBufferTXN *txn = NULL;
626  volatile Snapshot snapshot_now = snapshot;
627 
628  if (xid != InvalidTransactionId)
629  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
630 
631  /* setup snapshot to allow catalog access */
632  SetupHistoricSnapshot(snapshot_now, NULL);
633  PG_TRY();
634  {
635  rb->message(rb, txn, lsn, false, prefix, message_size, message);
636 
638  }
639  PG_CATCH();
640  {
642  PG_RE_THROW();
643  }
644  PG_END_TRY();
645  }
646 }
char * pstrdup(const char *in)
Definition: mcxt.c:1076
union ReorderBufferChange::@93 data
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2011
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
#define InvalidTransactionId
Definition: transam.h:31
#define PG_CATCH()
Definition: elog.h:293
#define Assert(condition)
Definition: c.h:664
#define PG_RE_THROW()
Definition: elog.h:314
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:848
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:1995
struct ReorderBufferChange::@93::@95 msg
#define PG_TRY()
Definition: elog.h:284
#define PG_END_TRY()
Definition: elog.h:300
void ReorderBufferReturnChange ( ReorderBuffer ,
ReorderBufferChange  
)

Definition at line 364 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::msg, pfree(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferFreeSnap(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, and ReorderBufferChange::tp.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferCommit(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), and ReorderBufferToastReset().

365 {
366  /* free contained data */
367  switch (change->action)
368  {
373  if (change->data.tp.newtuple)
374  {
375  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
376  change->data.tp.newtuple = NULL;
377  }
378 
379  if (change->data.tp.oldtuple)
380  {
381  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
382  change->data.tp.oldtuple = NULL;
383  }
384  break;
386  if (change->data.msg.prefix != NULL)
387  pfree(change->data.msg.prefix);
388  change->data.msg.prefix = NULL;
389  if (change->data.msg.message != NULL)
390  pfree(change->data.msg.message);
391  change->data.msg.message = NULL;
392  break;
394  if (change->data.snapshot)
395  {
396  ReorderBufferFreeSnap(rb, change->data.snapshot);
397  change->data.snapshot = NULL;
398  }
399  break;
400  /* no data in addition to the struct itself */
404  break;
405  }
406 
407  pfree(change);
408 }
void pfree(void *pointer)
Definition: mcxt.c:949
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
void ReorderBufferReturnTupleBuf ( ReorderBuffer ,
ReorderBufferTupleBuf tuple 
)

Definition at line 469 of file reorderbuffer.c.

References ReorderBufferTupleBuf::alloc_tuple_size, ReorderBuffer::cached_tuplebufs, max_cached_tuplebufs, MaxHeapTupleSize, ReorderBufferTupleBuf::node, ReorderBuffer::nr_cached_tuplebufs, pfree(), slist_push_head(), HeapTupleData::t_data, ReorderBufferTupleBuf::tuple, VALGRIND_MAKE_MEM_DEFINED, and VALGRIND_MAKE_MEM_UNDEFINED.

Referenced by ReorderBufferReturnChange().

470 {
471  /* check whether to put into the slab cache, oversized tuples never are */
472  if (tuple->alloc_tuple_size == MaxHeapTupleSize &&
473  rb->nr_cached_tuplebufs < max_cached_tuplebufs)
474  {
475  rb->nr_cached_tuplebufs++;
476  slist_push_head(&rb->cached_tuplebufs, &tuple->node);
479  VALGRIND_MAKE_MEM_DEFINED(&tuple->node, sizeof(tuple->node));
481  }
482  else
483  {
484  pfree(tuple);
485  }
486 }
#define VALGRIND_MAKE_MEM_DEFINED(addr, size)
Definition: memdebug.h:26
static const Size max_cached_tuplebufs
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
Definition: memdebug.h:28
static void slist_push_head(slist_head *head, slist_node *node)
Definition: ilist.h:574
HeapTupleHeader t_data
Definition: htup.h:67
void pfree(void *pointer)
Definition: mcxt.c:949
#define MaxHeapTupleSize
Definition: htup_details.h:561
HeapTupleData tuple
Definition: reorderbuffer.h:27
void ReorderBufferSetBaseSnapshot ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
struct SnapshotData snap 
)
void ReorderBufferSetRestartPoint ( ReorderBuffer ,
XLogRecPtr  ptr 
)

Definition at line 693 of file reorderbuffer.c.

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

694 {
695  rb->current_restart_decoding_lsn = ptr;
696 }
bool ReorderBufferXidHasBaseSnapshot ( ReorderBuffer ,
TransactionId  xid 
)

Definition at line 1989 of file reorderbuffer.c.

References ReorderBufferTXN::base_snapshot, InvalidXLogRecPtr, and ReorderBufferTXNByXid().

Referenced by SnapBuildCommitTxn(), SnapBuildDistributeNewCatalogSnapshot(), and SnapBuildProcessChange().

1990 {
1991  ReorderBufferTXN *txn;
1992 
1993  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1994  false);
1995 
1996  /* transaction isn't known yet, ergo no snapshot */
1997  if (txn == NULL)
1998  return false;
1999 
2000  /*
2001  * TODO: It would be a nice improvement if we would check the toplevel
2002  * transaction in subtransactions, but we'd need to keep track of a bit
2003  * more state.
2004  */
2005  return txn->base_snapshot != NULL;
2006 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
bool ReorderBufferXidHasCatalogChanges ( ReorderBuffer ,
TransactionId  xid 
)

Definition at line 1973 of file reorderbuffer.c.

References ReorderBufferTXN::has_catalog_changes, InvalidXLogRecPtr, and ReorderBufferTXNByXid().

Referenced by SnapBuildCommitTxn().

1974 {
1975  ReorderBufferTXN *txn;
1976 
1977  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1978  false);
1979  if (txn == NULL)
1980  return false;
1981 
1982  return txn->has_catalog_changes;
1983 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferXidSetCatalogChanges ( ReorderBuffer ,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1958 of file reorderbuffer.c.

References ReorderBufferTXN::has_catalog_changes, and ReorderBufferTXNByXid().

Referenced by DecodeCommit(), DecodeHeapOp(), and SnapBuildProcessNewCid().

1960 {
1961  ReorderBufferTXN *txn;
1962 
1963  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1964 
1965  txn->has_catalog_changes = true;
1966 }
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void StartupReorderBuffer ( void  )

Definition at line 2604 of file reorderbuffer.c.

References AllocateDir(), dirent::d_name, DEBUG2, ereport, errcode_for_file_access(), errmsg(), FreeDir(), lstat, MAXPGPATH, PANIC, ReadDir(), and ReplicationSlotValidateName().

Referenced by StartupXLOG().

2605 {
2606  DIR *logical_dir;
2607  struct dirent *logical_de;
2608 
2609  DIR *spill_dir;
2610  struct dirent *spill_de;
2611 
2612  logical_dir = AllocateDir("pg_replslot");
2613  while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
2614  {
2615  struct stat statbuf;
2616  char path[MAXPGPATH * 2 + 12];
2617 
2618  if (strcmp(logical_de->d_name, ".") == 0 ||
2619  strcmp(logical_de->d_name, "..") == 0)
2620  continue;
2621 
2622  /* if it cannot be a slot, skip the directory */
2623  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
2624  continue;
2625 
2626  /*
2627  * ok, has to be a surviving logical slot, iterate and delete
2628  * everything starting with xid-*
2629  */
2630  sprintf(path, "pg_replslot/%s", logical_de->d_name);
2631 
2632  /* we're only creating directories here, skip if it's not our's */
2633  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2634  continue;
2635 
2636  spill_dir = AllocateDir(path);
2637  while ((spill_de = ReadDir(spill_dir, path)) != NULL)
2638  {
2639  if (strcmp(spill_de->d_name, ".") == 0 ||
2640  strcmp(spill_de->d_name, "..") == 0)
2641  continue;
2642 
2643  /* only look at names that can be ours */
2644  if (strncmp(spill_de->d_name, "xid", 3) == 0)
2645  {
2646  sprintf(path, "pg_replslot/%s/%s", logical_de->d_name,
2647  spill_de->d_name);
2648 
2649  if (unlink(path) != 0)
2650  ereport(PANIC,
2652  errmsg("could not remove file \"%s\": %m",
2653  path)));
2654  }
2655  }
2656  FreeDir(spill_dir);
2657  }
2658  FreeDir(logical_dir);
2659 }
Definition: dirent.h:9
#define PANIC
Definition: elog.h:53
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:174
Definition: dirent.c:25
#define MAXPGPATH
#define DEBUG2
Definition: elog.h:24
int errcode_for_file_access(void)
Definition: elog.c:598
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2367
#define ereport(elevel, rest)
Definition: elog.h:122
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2433
int errmsg(const char *fmt,...)
Definition: elog.c:797
char d_name[MAX_PATH]
Definition: dirent.h:14
#define lstat(path, sb)
Definition: win32.h:262
int FreeDir(DIR *dir)
Definition: fd.c:2476