PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
reorderbuffer.h File Reference
#include "access/htup_details.h"
#include "lib/ilist.h"
#include "storage/sinval.h"
#include "utils/hsearch.h"
#include "utils/relcache.h"
#include "utils/snapshot.h"
#include "utils/timestamp.h"
Include dependency graph for reorderbuffer.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTupleBuf
 
struct  ReorderBufferChange
 
struct  ReorderBufferTXN
 
struct  ReorderBuffer
 

Macros

#define ReorderBufferTupleBufData(p)   ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))
 

Typedefs

typedef struct
ReorderBufferTupleBuf 
ReorderBufferTupleBuf
 
typedef struct ReorderBufferChange ReorderBufferChange
 
typedef struct ReorderBufferTXN ReorderBufferTXN
 
typedef struct ReorderBuffer ReorderBuffer
 
typedef void(* ReorderBufferApplyChangeCB )(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
typedef void(* ReorderBufferBeginCB )(ReorderBuffer *rb, ReorderBufferTXN *txn)
 
typedef void(* ReorderBufferCommitCB )(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
typedef void(* ReorderBufferMessageCB )(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 

Enumerations

enum  ReorderBufferChangeType {
  REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_MESSAGE,
  REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
  REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
}
 

Functions

ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *)
 
ReorderBufferTupleBufReorderBufferGetTupleBuf (ReorderBuffer *, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (ReorderBuffer *, ReorderBufferTupleBuf *tuple)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *)
 
void ReorderBufferReturnChange (ReorderBuffer *, ReorderBufferChange *)
 
void ReorderBufferQueueChange (ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *)
 
void ReorderBufferQueueMessage (ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
void ReorderBufferCommit (ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferAssignChild (ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
void ReorderBufferAbort (ReorderBuffer *, TransactionId, XLogRecPtr lsn)
 
void ReorderBufferAbortOld (ReorderBuffer *, TransactionId xid)
 
void ReorderBufferForget (ReorderBuffer *, TransactionId, XLogRecPtr lsn)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap)
 
void ReorderBufferAddSnapshot (ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *, TransactionId, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *, TransactionId, XLogRecPtr lsn, RelFileNode node, ItemPointerData pt, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *, TransactionId, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *, TransactionId xid, XLogRecPtr lsn)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *, TransactionId xid)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *, XLogRecPtr ptr)
 
void StartupReorderBuffer (void)
 

Macro Definition Documentation

#define ReorderBufferTupleBufData (   p)    ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))

Typedef Documentation

Definition at line 266 of file reorderbuffer.h.

typedef void(* ReorderBufferApplyChangeCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)

Definition at line 269 of file reorderbuffer.h.

typedef void(* ReorderBufferBeginCB)(ReorderBuffer *rb, ReorderBufferTXN *txn)

Definition at line 276 of file reorderbuffer.h.

typedef void(* ReorderBufferCommitCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

Definition at line 281 of file reorderbuffer.h.

typedef void(* ReorderBufferMessageCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)

Definition at line 287 of file reorderbuffer.h.

Enumeration Type Documentation

Enumerator
REORDER_BUFFER_CHANGE_INSERT 
REORDER_BUFFER_CHANGE_UPDATE 
REORDER_BUFFER_CHANGE_DELETE 
REORDER_BUFFER_CHANGE_MESSAGE 
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT 
REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID 
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM 

Definition at line 52 of file reorderbuffer.h.

Function Documentation

void ReorderBufferAbort ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn 
)

Definition at line 1724 of file reorderbuffer.c.

References ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, NULL, ReorderBufferCleanupTXN(), and ReorderBufferTXNByXid().

Referenced by DecodeAbort().

1725 {
1726  ReorderBufferTXN *txn;
1727 
1728  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1729  false);
1730 
1731  /* unknown, nothing to remove */
1732  if (txn == NULL)
1733  return;
1734 
1735  /* cosmetic... */
1736  txn->final_lsn = lsn;
1737 
1738  /* remove potential on-disk data, and deallocate */
1739  ReorderBufferCleanupTXN(rb, txn);
1740 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define NULL
Definition: c.h:226
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferAbortOld ( ReorderBuffer ,
TransactionId  xid 
)

Definition at line 1750 of file reorderbuffer.c.

References dlist_mutable_iter::cur, DEBUG1, dlist_container, dlist_foreach_modify, elog, ReorderBufferCleanupTXN(), ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), and ReorderBufferTXN::xid.

Referenced by DecodeStandbyOp().

1751 {
1752  dlist_mutable_iter it;
1753 
1754  /*
1755  * Iterate through all (potential) toplevel TXNs and abort all that are
1756  * older than what possibly can be running. Once we've found the first
1757  * that is alive we stop, there might be some that acquired an xid earlier
1758  * but started writing later, but it's unlikely and they will cleaned up
1759  * in a later call to ReorderBufferAbortOld().
1760  */
1761  dlist_foreach_modify(it, &rb->toplevel_by_lsn)
1762  {
1763  ReorderBufferTXN *txn;
1764 
1765  txn = dlist_container(ReorderBufferTXN, node, it.cur);
1766 
1767  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1768  {
1769  elog(DEBUG1, "aborting old transaction %u", txn->xid);
1770 
1771  /* remove potential on-disk data, and deallocate this tx */
1772  ReorderBufferCleanupTXN(rb, txn);
1773  }
1774  else
1775  return;
1776  }
1777 }
#define DEBUG1
Definition: elog.h:25
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
#define elog
Definition: elog.h:219
void ReorderBufferAddInvalidations ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 1962 of file reorderbuffer.c.

References Assert, ReorderBuffer::context, elog, ERROR, ReorderBufferTXN::invalidations, MemoryContextAlloc(), ReorderBufferTXN::ninvalidations, NULL, and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

1965 {
1966  ReorderBufferTXN *txn;
1967 
1968  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1969 
1970  if (txn->ninvalidations != 0)
1971  elog(ERROR, "only ever add one set of invalidations");
1972 
1973  Assert(nmsgs > 0);
1974 
1975  txn->ninvalidations = nmsgs;
1977  MemoryContextAlloc(rb->context,
1978  sizeof(SharedInvalidationMessage) * nmsgs);
1979  memcpy(txn->invalidations, msgs,
1980  sizeof(SharedInvalidationMessage) * nmsgs);
1981 }
#define ERROR
Definition: elog.h:43
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:749
#define elog
Definition: elog.h:219
void ReorderBufferAddNewCommandId ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
CommandId  cid 
)

Definition at line 1918 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::command_id, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, ReorderBufferGetChange(), and ReorderBufferQueueChange().

Referenced by SnapBuildProcessNewCid().

1920 {
1922 
1923  change->data.command_id = cid;
1925 
1926  ReorderBufferQueueChange(rb, xid, lsn, change);
1927 }
union ReorderBufferChange::@49 data
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
void ReorderBufferAddNewTupleCids ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
RelFileNode  node,
ItemPointerData  pt,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 1934 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, NULL, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, and ReorderBufferTXN::tuplecids.

Referenced by SnapBuildProcessNewCid().

1938 {
1940  ReorderBufferTXN *txn;
1941 
1942  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1943 
1944  change->data.tuplecid.node = node;
1945  change->data.tuplecid.tid = tid;
1946  change->data.tuplecid.cmin = cmin;
1947  change->data.tuplecid.cmax = cmax;
1948  change->data.tuplecid.combocid = combocid;
1949  change->lsn = lsn;
1951 
1952  dlist_push_tail(&txn->tuplecids, &change->node);
1953  txn->ntuplecids++;
1954 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
union ReorderBufferChange::@49 data
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
struct ReorderBufferChange::@49::@52 tuplecid
#define NULL
Definition: c.h:226
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
dlist_head tuplecids
void ReorderBufferAddSnapshot ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
struct SnapshotData snap 
)
ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 226 of file reorderbuffer.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate(), ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::cached_changes, ReorderBuffer::cached_transactions, ReorderBuffer::cached_tuplebufs, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, dlist_init(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), ReorderBuffer::nr_cached_changes, ReorderBuffer::nr_cached_transactions, ReorderBuffer::nr_cached_tuplebufs, NULL, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, slist_init(), and ReorderBuffer::toplevel_by_lsn.

Referenced by StartupDecodingContext().

227 {
228  ReorderBuffer *buffer;
229  HASHCTL hash_ctl;
230  MemoryContext new_ctx;
231 
232  /* allocate memory in own context, to have better accountability */
234  "ReorderBuffer",
236 
237  buffer =
238  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
239 
240  memset(&hash_ctl, 0, sizeof(hash_ctl));
241 
242  buffer->context = new_ctx;
243 
244  hash_ctl.keysize = sizeof(TransactionId);
245  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
246  hash_ctl.hcxt = buffer->context;
247 
248  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
250 
252  buffer->by_txn_last_txn = NULL;
253 
254  buffer->nr_cached_transactions = 0;
255  buffer->nr_cached_changes = 0;
256  buffer->nr_cached_tuplebufs = 0;
257 
258  buffer->outbuf = NULL;
259  buffer->outbufsize = 0;
260 
262 
263  dlist_init(&buffer->toplevel_by_lsn);
265  dlist_init(&buffer->cached_changes);
266  slist_init(&buffer->cached_tuplebufs);
267 
268  return buffer;
269 }
dlist_head cached_changes
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
uint32 TransactionId
Definition: c.h:394
MemoryContext hcxt
Definition: hsearch.h:78
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
Size entrysize
Definition: hsearch.h:73
static void slist_init(slist_head *head)
Definition: ilist.h:554
Size nr_cached_transactions
Size nr_cached_changes
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:145
dlist_head cached_transactions
#define InvalidTransactionId
Definition: transam.h:31
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext context
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:440
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:301
ReorderBufferTXN * by_txn_last_txn
dlist_head toplevel_by_lsn
Size keysize
Definition: hsearch.h:72
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
#define NULL
Definition: c.h:226
Size nr_cached_tuplebufs
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:749
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
slist_head cached_tuplebufs
void ReorderBufferAssignChild ( ReorderBuffer ,
TransactionId  ,
TransactionId  ,
XLogRecPtr  commit_lsn 
)

Definition at line 741 of file reorderbuffer.c.

References Assert, dlist_delete(), dlist_push_tail(), elog, ERROR, ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, ReorderBufferTXNByXid(), and ReorderBufferTXN::subtxns.

Referenced by DecodeXactOp().

743 {
744  ReorderBufferTXN *txn;
745  ReorderBufferTXN *subtxn;
746  bool new_top;
747  bool new_sub;
748 
749  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
750  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
751 
752  if (new_sub)
753  {
754  /*
755  * we assign subtransactions to top level transaction even if we don't
756  * have data for it yet, assignment records frequently reference xids
757  * that have not yet produced any records. Knowing those aren't top
758  * level xids allows us to make processing cheaper in some places.
759  */
760  dlist_push_tail(&txn->subtxns, &subtxn->node);
761  txn->nsubtxns++;
762  }
763  else if (!subtxn->is_known_as_subxact)
764  {
765  subtxn->is_known_as_subxact = true;
766  Assert(subtxn->nsubtxns == 0);
767 
768  /* remove from lsn order list of top-level transactions */
769  dlist_delete(&subtxn->node);
770 
771  /* add to toplevel transaction */
772  dlist_push_tail(&txn->subtxns, &subtxn->node);
773  txn->nsubtxns++;
774  }
775  else if (new_top)
776  {
777  elog(ERROR, "existing subxact assigned to unknown toplevel xact");
778  }
779 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
#define ERROR
Definition: elog.h:43
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define Assert(condition)
Definition: c.h:671
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define elog
Definition: elog.h:219
void ReorderBufferCommit ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 1353 of file reorderbuffer.c.

References AbortCurrentTransaction(), ReorderBufferChange::action, ReorderBuffer::apply_change, Assert, ReorderBufferTXN::base_snapshot, ReorderBuffer::begin, BeginInternalSubTransaction(), ReorderBufferChange::command_id, ReorderBuffer::commit, ReorderBufferTXN::commit_time, SnapshotData::copied, SnapshotData::curcid, ReorderBufferChange::data, dlist_delete(), elog, ReorderBufferTXN::end_lsn, ERROR, ReorderBufferTXN::final_lsn, FirstCommandId, GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, ReorderBuffer::message, ReorderBufferChange::msg, ReorderBufferTXN::ninvalidations, ReorderBufferChange::node, NULL, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelidByRelfilenode(), RELKIND_SEQUENCE, relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferReturnChange(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTXNByXid(), RollbackAndReleaseCurrentSubTransaction(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, StartTransactionCommand(), TeardownHistoricSnapshot(), ReorderBufferChange::tp, and ReorderBufferTXN::tuplecid_hash.

Referenced by DecodeCommit().

1357 {
1358  ReorderBufferTXN *txn;
1359  volatile Snapshot snapshot_now;
1360  volatile CommandId command_id = FirstCommandId;
1361  bool using_subtxn;
1362  ReorderBufferIterTXNState *volatile iterstate = NULL;
1363 
1364  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1365  false);
1366 
1367  /* unknown transaction, nothing to replay */
1368  if (txn == NULL)
1369  return;
1370 
1371  txn->final_lsn = commit_lsn;
1372  txn->end_lsn = end_lsn;
1373  txn->commit_time = commit_time;
1374  txn->origin_id = origin_id;
1375  txn->origin_lsn = origin_lsn;
1376 
1377  /*
1378  * If this transaction didn't have any real changes in our database, it's
1379  * OK not to have a snapshot. Note that ReorderBufferCommitChild will have
1380  * transferred its snapshot to this transaction if it had one and the
1381  * toplevel tx didn't.
1382  */
1383  if (txn->base_snapshot == NULL)
1384  {
1385  Assert(txn->ninvalidations == 0);
1386  ReorderBufferCleanupTXN(rb, txn);
1387  return;
1388  }
1389 
1390  snapshot_now = txn->base_snapshot;
1391 
1392  /* build data to be able to lookup the CommandIds of catalog tuples */
1394 
1395  /* setup the initial snapshot */
1396  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1397 
1398  /*
1399  * Decoding needs access to syscaches et al., which in turn use
1400  * heavyweight locks and such. Thus we need to have enough state around to
1401  * keep track of those. The easiest way is to simply use a transaction
1402  * internally. That also allows us to easily enforce that nothing writes
1403  * to the database by checking for xid assignments.
1404  *
1405  * When we're called via the SQL SRF there's already a transaction
1406  * started, so start an explicit subtransaction there.
1407  */
1408  using_subtxn = IsTransactionOrTransactionBlock();
1409 
1410  PG_TRY();
1411  {
1412  ReorderBufferChange *change;
1413  ReorderBufferChange *specinsert = NULL;
1414 
1415  if (using_subtxn)
1416  BeginInternalSubTransaction("replay");
1417  else
1419 
1420  rb->begin(rb, txn);
1421 
1422  iterstate = ReorderBufferIterTXNInit(rb, txn);
1423  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1424  {
1425  Relation relation = NULL;
1426  Oid reloid;
1427 
1428  switch (change->action)
1429  {
1431 
1432  /*
1433  * Confirmation for speculative insertion arrived. Simply
1434  * use as a normal record. It'll be cleaned up at the end
1435  * of INSERT processing.
1436  */
1437  Assert(specinsert->data.tp.oldtuple == NULL);
1438  change = specinsert;
1440 
1441  /* intentionally fall through */
1445  Assert(snapshot_now);
1446 
1447  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1448  change->data.tp.relnode.relNode);
1449 
1450  /*
1451  * Catalog tuple without data, emitted while catalog was
1452  * in the process of being rewritten.
1453  */
1454  if (reloid == InvalidOid &&
1455  change->data.tp.newtuple == NULL &&
1456  change->data.tp.oldtuple == NULL)
1457  goto change_done;
1458  else if (reloid == InvalidOid)
1459  elog(ERROR, "could not map filenode \"%s\" to relation OID",
1460  relpathperm(change->data.tp.relnode,
1461  MAIN_FORKNUM));
1462 
1463  relation = RelationIdGetRelation(reloid);
1464 
1465  if (relation == NULL)
1466  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1467  reloid,
1468  relpathperm(change->data.tp.relnode,
1469  MAIN_FORKNUM));
1470 
1471  if (!RelationIsLogicallyLogged(relation))
1472  goto change_done;
1473 
1474  /*
1475  * For now ignore sequence changes entirely. Most of the
1476  * time they don't log changes using records we
1477  * understand, so it doesn't make sense to handle the few
1478  * cases we do.
1479  */
1480  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1481  goto change_done;
1482 
1483  /* user-triggered change */
1484  if (!IsToastRelation(relation))
1485  {
1486  ReorderBufferToastReplace(rb, txn, relation, change);
1487  rb->apply_change(rb, txn, relation, change);
1488 
1489  /*
1490  * Only clear reassembled toast chunks if we're sure
1491  * they're not required anymore. The creator of the
1492  * tuple tells us.
1493  */
1494  if (change->data.tp.clear_toast_afterwards)
1495  ReorderBufferToastReset(rb, txn);
1496  }
1497  /* we're not interested in toast deletions */
1498  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1499  {
1500  /*
1501  * Need to reassemble the full toasted Datum in
1502  * memory, to ensure the chunks don't get reused till
1503  * we're done remove it from the list of this
1504  * transaction's changes. Otherwise it will get
1505  * freed/reused while restoring spooled data from
1506  * disk.
1507  */
1508  dlist_delete(&change->node);
1509  ReorderBufferToastAppendChunk(rb, txn, relation,
1510  change);
1511  }
1512 
1513  change_done:
1514 
1515  /*
1516  * Either speculative insertion was confirmed, or it was
1517  * unsuccessful and the record isn't needed anymore.
1518  */
1519  if (specinsert != NULL)
1520  {
1521  ReorderBufferReturnChange(rb, specinsert);
1522  specinsert = NULL;
1523  }
1524 
1525  if (relation != NULL)
1526  {
1527  RelationClose(relation);
1528  relation = NULL;
1529  }
1530  break;
1531 
1533 
1534  /*
1535  * Speculative insertions are dealt with by delaying the
1536  * processing of the insert until the confirmation record
1537  * arrives. For that we simply unlink the record from the
1538  * chain, so it does not get freed/reused while restoring
1539  * spooled data from disk.
1540  *
1541  * This is safe in the face of concurrent catalog changes
1542  * because the relevant relation can't be changed between
1543  * speculative insertion and confirmation due to
1544  * CheckTableNotInUse() and locking.
1545  */
1546 
1547  /* clear out a pending (and thus failed) speculation */
1548  if (specinsert != NULL)
1549  {
1550  ReorderBufferReturnChange(rb, specinsert);
1551  specinsert = NULL;
1552  }
1553 
1554  /* and memorize the pending insertion */
1555  dlist_delete(&change->node);
1556  specinsert = change;
1557  break;
1558 
1560  rb->message(rb, txn, change->lsn, true,
1561  change->data.msg.prefix,
1562  change->data.msg.message_size,
1563  change->data.msg.message);
1564  break;
1565 
1567  /* get rid of the old */
1568  TeardownHistoricSnapshot(false);
1569 
1570  if (snapshot_now->copied)
1571  {
1572  ReorderBufferFreeSnap(rb, snapshot_now);
1573  snapshot_now =
1574  ReorderBufferCopySnap(rb, change->data.snapshot,
1575  txn, command_id);
1576  }
1577 
1578  /*
1579  * Restored from disk, need to be careful not to double
1580  * free. We could introduce refcounting for that, but for
1581  * now this seems infrequent enough not to care.
1582  */
1583  else if (change->data.snapshot->copied)
1584  {
1585  snapshot_now =
1586  ReorderBufferCopySnap(rb, change->data.snapshot,
1587  txn, command_id);
1588  }
1589  else
1590  {
1591  snapshot_now = change->data.snapshot;
1592  }
1593 
1594 
1595  /* and continue with the new one */
1596  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1597  break;
1598 
1600  Assert(change->data.command_id != InvalidCommandId);
1601 
1602  if (command_id < change->data.command_id)
1603  {
1604  command_id = change->data.command_id;
1605 
1606  if (!snapshot_now->copied)
1607  {
1608  /* we don't use the global one anymore */
1609  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1610  txn, command_id);
1611  }
1612 
1613  snapshot_now->curcid = command_id;
1614 
1615  TeardownHistoricSnapshot(false);
1616  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1617 
1618  /*
1619  * Every time the CommandId is incremented, we could
1620  * see new catalog contents, so execute all
1621  * invalidations.
1622  */
1624  }
1625 
1626  break;
1627 
1629  elog(ERROR, "tuplecid value in changequeue");
1630  break;
1631  }
1632  }
1633 
1634  /*
1635  * There's a speculative insertion remaining, just clean in up, it
1636  * can't have been successful, otherwise we'd gotten a confirmation
1637  * record.
1638  */
1639  if (specinsert)
1640  {
1641  ReorderBufferReturnChange(rb, specinsert);
1642  specinsert = NULL;
1643  }
1644 
1645  /* clean up the iterator */
1646  ReorderBufferIterTXNFinish(rb, iterstate);
1647  iterstate = NULL;
1648 
1649  /* call commit callback */
1650  rb->commit(rb, txn, commit_lsn);
1651 
1652  /* this is just a sanity check against bad output plugin behaviour */
1654  elog(ERROR, "output plugin used XID %u",
1656 
1657  /* cleanup */
1658  TeardownHistoricSnapshot(false);
1659 
1660  /*
1661  * Aborting the current (sub-)transaction as a whole has the right
1662  * semantics. We want all locks acquired in here to be released, not
1663  * reassigned to the parent and we do not want any database access
1664  * have persistent effects.
1665  */
1667 
1668  /* make sure there's no cache pollution */
1670 
1671  if (using_subtxn)
1673 
1674  if (snapshot_now->copied)
1675  ReorderBufferFreeSnap(rb, snapshot_now);
1676 
1677  /* remove potential on-disk data, and deallocate */
1678  ReorderBufferCleanupTXN(rb, txn);
1679  }
1680  PG_CATCH();
1681  {
1682  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1683  if (iterstate)
1684  ReorderBufferIterTXNFinish(rb, iterstate);
1685 
1687 
1688  /*
1689  * Force cache invalidation to happen outside of a valid transaction
1690  * to prevent catalog access as we just caught an error.
1691  */
1693 
1694  /* make sure there's no cache pollution */
1696 
1697  if (using_subtxn)
1699 
1700  if (snapshot_now->copied)
1701  ReorderBufferFreeSnap(rb, snapshot_now);
1702 
1703  /* remove potential on-disk data, and deallocate */
1704  ReorderBufferCleanupTXN(rb, txn);
1705 
1706  PG_RE_THROW();
1707  }
1708  PG_END_TRY();
1709 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
uint32 CommandId
Definition: c.h:408
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
TimestampTz commit_time
void AbortCurrentTransaction(void)
Definition: xact.c:2984
bool IsToastRelation(Relation relation)
Definition: catalog.c:135
#define relpathperm(rnode, forknum)
Definition: relpath.h:67
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
bool copied
Definition: snapshot.h:94
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
union ReorderBufferChange::@49 data
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4320
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:1965
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:572
Form_pg_class rd_rel
Definition: rel.h:113
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
XLogRecPtr origin_lsn
#define FirstCommandId
Definition: c.h:410
struct ReorderBufferChange::@49::@51 msg
struct ReorderBufferChange::@49::@50 tp
#define ERROR
Definition: elog.h:43
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:416
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4155
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:433
#define InvalidTransactionId
Definition: transam.h:31
XLogRecPtr final_lsn
void RelationClose(Relation relation)
Definition: relcache.c:2155
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
#define InvalidCommandId
Definition: c.h:411
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
#define InvalidOid
Definition: postgres_ext.h:36
CommandId curcid
Definition: snapshot.h:96
static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define PG_CATCH()
Definition: elog.h:293
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
XLogRecPtr end_lsn
void StartTransactionCommand(void)
Definition: xact.c:2675
void BeginInternalSubTransaction(char *name)
Definition: xact.c:4051
#define PG_RE_THROW()
Definition: elog.h:314
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:1949
#define elog
Definition: elog.h:219
#define PG_TRY()
Definition: elog.h:284
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
#define RELKIND_SEQUENCE
Definition: pg_class.h:162
static ReorderBufferIterTXNState * ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2066
#define PG_END_TRY()
Definition: elog.h:300
void ReorderBufferCommitChild ( ReorderBuffer ,
TransactionId  ,
TransactionId  ,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 786 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_delete(), dlist_push_tail(), elog, ReorderBufferTXN::end_lsn, ERROR, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, NULL, ReorderBufferTXNByXid(), and ReorderBufferTXN::subtxns.

Referenced by DecodeCommit().

789 {
790  ReorderBufferTXN *txn;
791  ReorderBufferTXN *subtxn;
792 
793  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
794  InvalidXLogRecPtr, false);
795 
796  /*
797  * No need to do anything if that subtxn didn't contain any changes
798  */
799  if (!subtxn)
800  return;
801 
802  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true);
803 
804  if (txn == NULL)
805  elog(ERROR, "subxact logged without previous toplevel record");
806 
807  /*
808  * Pass our base snapshot to the parent transaction if it doesn't have
809  * one, or ours is older. That can happen if there are no changes in the
810  * toplevel transaction but in one of the child transactions. This allows
811  * the parent to simply use its base snapshot initially.
812  */
813  if (subtxn->base_snapshot != NULL &&
814  (txn->base_snapshot == NULL ||
815  txn->base_snapshot_lsn > subtxn->base_snapshot_lsn))
816  {
817  txn->base_snapshot = subtxn->base_snapshot;
818  txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
819  subtxn->base_snapshot = NULL;
821  }
822 
823  subtxn->final_lsn = commit_lsn;
824  subtxn->end_lsn = end_lsn;
825 
826  if (!subtxn->is_known_as_subxact)
827  {
828  subtxn->is_known_as_subxact = true;
829  Assert(subtxn->nsubtxns == 0);
830 
831  /* remove from lsn order list of top-level transactions */
832  dlist_delete(&subtxn->node);
833 
834  /* add to subtransaction list */
835  dlist_push_tail(&txn->subtxns, &subtxn->node);
836  txn->nsubtxns++;
837  }
838 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
XLogRecPtr base_snapshot_lsn
#define ERROR
Definition: elog.h:43
XLogRecPtr final_lsn
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
XLogRecPtr end_lsn
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define elog
Definition: elog.h:219
void ReorderBufferForget ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn 
)

Definition at line 1793 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, NULL, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

1794 {
1795  ReorderBufferTXN *txn;
1796 
1797  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1798  false);
1799 
1800  /* unknown, nothing to forget */
1801  if (txn == NULL)
1802  return;
1803 
1804  /* cosmetic... */
1805  txn->final_lsn = lsn;
1806 
1807  /*
1808  * Process cache invalidation messages if there are any. Even if we're not
1809  * interested in the transaction's contents, it could have manipulated the
1810  * catalog and we need to update the caches according to that.
1811  */
1812  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
1814  txn->invalidations);
1815  else
1816  Assert(txn->ninvalidations == 0);
1817 
1818  /* remove potential on-disk data, and deallocate */
1819  ReorderBufferCleanupTXN(rb, txn);
1820 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferFree ( ReorderBuffer )

Definition at line 275 of file reorderbuffer.c.

References ReorderBuffer::context, and MemoryContextDelete().

Referenced by FreeDecodingContext().

276 {
277  MemoryContext context = rb->context;
278 
279  /*
280  * We free separately allocated data by entirely scrapping reorderbuffer's
281  * memory context.
282  */
283  MemoryContextDelete(context);
284 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
ReorderBufferChange* ReorderBufferGetChange ( ReorderBuffer )

Definition at line 365 of file reorderbuffer.c.

References ReorderBuffer::cached_changes, ReorderBuffer::context, dlist_container, dlist_pop_head_node(), MemoryContextAlloc(), and ReorderBuffer::nr_cached_changes.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddNewTupleCids(), ReorderBufferAddSnapshot(), ReorderBufferQueueMessage(), and ReorderBufferRestoreChange().

366 {
367  ReorderBufferChange *change;
368 
369  /* check the slab cache */
370  if (rb->nr_cached_changes)
371  {
372  rb->nr_cached_changes--;
373  change = (ReorderBufferChange *)
375  dlist_pop_head_node(&rb->cached_changes));
376  }
377  else
378  {
379  change = (ReorderBufferChange *)
380  MemoryContextAlloc(rb->context, sizeof(ReorderBufferChange));
381  }
382 
383  memset(change, 0, sizeof(ReorderBufferChange));
384  return change;
385 }
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:749
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer )

Definition at line 718 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, NULL, and ReorderBuffer::toplevel_by_lsn.

Referenced by SnapBuildProcessRunningXacts().

719 {
720  ReorderBufferTXN *txn;
721 
722  if (dlist_is_empty(&rb->toplevel_by_lsn))
723  return NULL;
724 
725  AssertTXNLsnOrder(rb);
726 
727  txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
728 
731  return txn;
732 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
ReorderBufferTupleBuf* ReorderBufferGetTupleBuf ( ReorderBuffer ,
Size  tuple_len 
)

Definition at line 457 of file reorderbuffer.c.

References ReorderBufferTupleBuf::alloc_tuple_size, Assert, ReorderBuffer::cached_tuplebufs, ReorderBuffer::context, MaxHeapTupleSize, MemoryContextAlloc(), ReorderBuffer::nr_cached_tuplebufs, ReorderBufferTupleBufData, SizeofHeapTupleHeader, slist_container, slist_pop_head_node(), HeapTupleData::t_data, ReorderBufferTupleBuf::tuple, and VALGRIND_MAKE_MEM_UNDEFINED.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

458 {
459  ReorderBufferTupleBuf *tuple;
460  Size alloc_len;
461 
462  alloc_len = tuple_len + SizeofHeapTupleHeader;
463 
464  /*
465  * Most tuples are below MaxHeapTupleSize, so we use a slab allocator for
466  * those. Thus always allocate at least MaxHeapTupleSize. Note that tuples
467  * generated for oldtuples can be bigger, as they don't have out-of-line
468  * toast columns.
469  */
470  if (alloc_len < MaxHeapTupleSize)
471  alloc_len = MaxHeapTupleSize;
472 
473 
474  /* if small enough, check the slab cache */
475  if (alloc_len <= MaxHeapTupleSize && rb->nr_cached_tuplebufs)
476  {
477  rb->nr_cached_tuplebufs--;
479  slist_pop_head_node(&rb->cached_tuplebufs));
481 #ifdef USE_ASSERT_CHECKING
482  memset(&tuple->tuple, 0xa9, sizeof(HeapTupleData));
484 #endif
485  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
486 #ifdef USE_ASSERT_CHECKING
487  memset(tuple->tuple.t_data, 0xa8, tuple->alloc_tuple_size);
489 #endif
490  }
491  else
492  {
493  tuple = (ReorderBufferTupleBuf *)
494  MemoryContextAlloc(rb->context,
495  sizeof(ReorderBufferTupleBuf) +
496  MAXIMUM_ALIGNOF + alloc_len);
497  tuple->alloc_tuple_size = alloc_len;
498  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
499  }
500 
501  return tuple;
502 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:170
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
Definition: memdebug.h:28
HeapTupleHeader t_data
Definition: htup.h:67
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:36
#define MaxHeapTupleSize
Definition: htup_details.h:561
HeapTupleData tuple
Definition: reorderbuffer.h:27
static slist_node * slist_pop_head_node(slist_head *head)
Definition: ilist.h:596
#define slist_container(type, membername, ptr)
Definition: ilist.h:674
#define Assert(condition)
Definition: c.h:671
size_t Size
Definition: c.h:353
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:749
void ReorderBufferImmediateInvalidation ( ReorderBuffer ,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 1829 of file reorderbuffer.c.

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by DecodeStandbyOp(), and ReorderBufferForget().

1831 {
1832  bool use_subtxn = IsTransactionOrTransactionBlock();
1833  int i;
1834 
1835  if (use_subtxn)
1836  BeginInternalSubTransaction("replay");
1837 
1838  /*
1839  * Force invalidations to happen outside of a valid transaction - that way
1840  * entries will just be marked as invalid without accessing the catalog.
1841  * That's advantageous because we don't need to setup the full state
1842  * necessary for catalog access.
1843  */
1844  if (use_subtxn)
1846 
1847  for (i = 0; i < ninvalidations; i++)
1848  LocalExecuteInvalidationMessage(&invalidations[i]);
1849 
1850  if (use_subtxn)
1852 }
void AbortCurrentTransaction(void)
Definition: xact.c:2984
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4320
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4155
void BeginInternalSubTransaction(char *name)
Definition: xact.c:4051
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:547
int i
void ReorderBufferProcessXid ( ReorderBuffer ,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1865 of file reorderbuffer.c.

References InvalidTransactionId, NULL, and ReorderBufferTXNByXid().

Referenced by DecodeHeap2Op(), DecodeHeapOp(), DecodeLogicalMsgOp(), DecodeStandbyOp(), DecodeXactOp(), DecodeXLogOp(), and LogicalDecodingProcessRecord().

1866 {
1867  /* many records won't have an xid assigned, centralize check here */
1868  if (xid != InvalidTransactionId)
1869  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1870 }
#define InvalidTransactionId
Definition: transam.h:31
#define NULL
Definition: c.h:226
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferQueueChange ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
ReorderBufferChange  
)

Definition at line 620 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, NULL, ReorderBufferCheckSerializeTXN(), and ReorderBufferTXNByXid().

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

622 {
623  ReorderBufferTXN *txn;
624 
625  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
626 
627  change->lsn = lsn;
628  Assert(InvalidXLogRecPtr != lsn);
629  dlist_push_tail(&txn->changes, &change->node);
630  txn->nentries++;
631  txn->nentries_mem++;
632 
634 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
dlist_head changes
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferQueueMessage ( ReorderBuffer ,
TransactionId  ,
Snapshot  snapshot,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 640 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, NULL, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), and TeardownHistoricSnapshot().

Referenced by DecodeLogicalMsgOp().

644 {
645  if (transactional)
646  {
647  MemoryContext oldcontext;
648  ReorderBufferChange *change;
649 
651 
652  oldcontext = MemoryContextSwitchTo(rb->context);
653 
654  change = ReorderBufferGetChange(rb);
656  change->data.msg.prefix = pstrdup(prefix);
657  change->data.msg.message_size = message_size;
658  change->data.msg.message = palloc(message_size);
659  memcpy(change->data.msg.message, message, message_size);
660 
661  ReorderBufferQueueChange(rb, xid, lsn, change);
662 
663  MemoryContextSwitchTo(oldcontext);
664  }
665  else
666  {
667  ReorderBufferTXN *txn = NULL;
668  volatile Snapshot snapshot_now = snapshot;
669 
670  if (xid != InvalidTransactionId)
671  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
672 
673  /* setup snapshot to allow catalog access */
674  SetupHistoricSnapshot(snapshot_now, NULL);
675  PG_TRY();
676  {
677  rb->message(rb, txn, lsn, false, prefix, message_size, message);
678 
680  }
681  PG_CATCH();
682  {
684  PG_RE_THROW();
685  }
686  PG_END_TRY();
687  }
688 }
char * pstrdup(const char *in)
Definition: mcxt.c:1165
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
union ReorderBufferChange::@49 data
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:1965
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
struct ReorderBufferChange::@49::@51 msg
#define InvalidTransactionId
Definition: transam.h:31
#define PG_CATCH()
Definition: elog.h:293
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
#define PG_RE_THROW()
Definition: elog.h:314
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:891
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:1949
#define PG_TRY()
Definition: elog.h:284
#define PG_END_TRY()
Definition: elog.h:300
void ReorderBufferReturnChange ( ReorderBuffer ,
ReorderBufferChange  
)

Definition at line 394 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBuffer::cached_changes, ReorderBufferChange::data, dlist_push_head(), max_cached_changes, ReorderBufferChange::msg, ReorderBufferChange::node, ReorderBuffer::nr_cached_changes, NULL, pfree(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferFreeSnap(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, ReorderBufferChange::tp, VALGRIND_MAKE_MEM_DEFINED, and VALGRIND_MAKE_MEM_UNDEFINED.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferCommit(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), and ReorderBufferToastReset().

395 {
396  /* free contained data */
397  switch (change->action)
398  {
403  if (change->data.tp.newtuple)
404  {
405  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
406  change->data.tp.newtuple = NULL;
407  }
408 
409  if (change->data.tp.oldtuple)
410  {
411  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
412  change->data.tp.oldtuple = NULL;
413  }
414  break;
416  if (change->data.msg.prefix != NULL)
417  pfree(change->data.msg.prefix);
418  change->data.msg.prefix = NULL;
419  if (change->data.msg.message != NULL)
420  pfree(change->data.msg.message);
421  change->data.msg.message = NULL;
422  break;
424  if (change->data.snapshot)
425  {
426  ReorderBufferFreeSnap(rb, change->data.snapshot);
427  change->data.snapshot = NULL;
428  }
429  break;
430  /* no data in addition to the struct itself */
434  break;
435  }
436 
437  /* check whether to put into the slab cache */
438  if (rb->nr_cached_changes < max_cached_changes)
439  {
440  rb->nr_cached_changes++;
441  dlist_push_head(&rb->cached_changes, &change->node);
443  VALGRIND_MAKE_MEM_DEFINED(&change->node, sizeof(change->node));
444  }
445  else
446  {
447  pfree(change);
448  }
449 }
#define VALGRIND_MAKE_MEM_DEFINED(addr, size)
Definition: memdebug.h:26
static void dlist_push_head(dlist_head *head, dlist_node *node)
Definition: ilist.h:300
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
Definition: memdebug.h:28
void pfree(void *pointer)
Definition: mcxt.c:992
static const Size max_cached_changes
#define NULL
Definition: c.h:226
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
void ReorderBufferReturnTupleBuf ( ReorderBuffer ,
ReorderBufferTupleBuf tuple 
)

Definition at line 511 of file reorderbuffer.c.

References ReorderBufferTupleBuf::alloc_tuple_size, ReorderBuffer::cached_tuplebufs, max_cached_tuplebufs, MaxHeapTupleSize, ReorderBufferTupleBuf::node, ReorderBuffer::nr_cached_tuplebufs, pfree(), slist_push_head(), HeapTupleData::t_data, ReorderBufferTupleBuf::tuple, VALGRIND_MAKE_MEM_DEFINED, and VALGRIND_MAKE_MEM_UNDEFINED.

Referenced by ReorderBufferReturnChange().

512 {
513  /* check whether to put into the slab cache, oversized tuples never are */
514  if (tuple->alloc_tuple_size == MaxHeapTupleSize &&
515  rb->nr_cached_tuplebufs < max_cached_tuplebufs)
516  {
517  rb->nr_cached_tuplebufs++;
518  slist_push_head(&rb->cached_tuplebufs, &tuple->node);
521  VALGRIND_MAKE_MEM_DEFINED(&tuple->node, sizeof(tuple->node));
523  }
524  else
525  {
526  pfree(tuple);
527  }
528 }
#define VALGRIND_MAKE_MEM_DEFINED(addr, size)
Definition: memdebug.h:26
static const Size max_cached_tuplebufs
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
Definition: memdebug.h:28
static void slist_push_head(slist_head *head, slist_node *node)
Definition: ilist.h:574
HeapTupleHeader t_data
Definition: htup.h:67
void pfree(void *pointer)
Definition: mcxt.c:992
#define MaxHeapTupleSize
Definition: htup_details.h:561
HeapTupleData tuple
Definition: reorderbuffer.h:27
void ReorderBufferSetBaseSnapshot ( ReorderBuffer ,
TransactionId  ,
XLogRecPtr  lsn,
struct SnapshotData snap 
)
void ReorderBufferSetRestartPoint ( ReorderBuffer ,
XLogRecPtr  ptr 
)

Definition at line 735 of file reorderbuffer.c.

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

736 {
737  rb->current_restart_decoding_lsn = ptr;
738 }
bool ReorderBufferXidHasBaseSnapshot ( ReorderBuffer ,
TransactionId  xid 
)

Definition at line 2031 of file reorderbuffer.c.

References ReorderBufferTXN::base_snapshot, InvalidXLogRecPtr, NULL, and ReorderBufferTXNByXid().

Referenced by SnapBuildCommitTxn(), SnapBuildDistributeNewCatalogSnapshot(), and SnapBuildProcessChange().

2032 {
2033  ReorderBufferTXN *txn;
2034 
2035  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2036  false);
2037 
2038  /* transaction isn't known yet, ergo no snapshot */
2039  if (txn == NULL)
2040  return false;
2041 
2042  /*
2043  * TODO: It would be a nice improvement if we would check the toplevel
2044  * transaction in subtransactions, but we'd need to keep track of a bit
2045  * more state.
2046  */
2047  return txn->base_snapshot != NULL;
2048 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
#define NULL
Definition: c.h:226
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
bool ReorderBufferXidHasCatalogChanges ( ReorderBuffer ,
TransactionId  xid 
)

Definition at line 2015 of file reorderbuffer.c.

References ReorderBufferTXN::has_catalog_changes, InvalidXLogRecPtr, NULL, and ReorderBufferTXNByXid().

Referenced by SnapBuildCommitTxn().

2016 {
2017  ReorderBufferTXN *txn;
2018 
2019  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2020  false);
2021  if (txn == NULL)
2022  return false;
2023 
2024  return txn->has_catalog_changes;
2025 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define NULL
Definition: c.h:226
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferXidSetCatalogChanges ( ReorderBuffer ,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2000 of file reorderbuffer.c.

References ReorderBufferTXN::has_catalog_changes, NULL, and ReorderBufferTXNByXid().

Referenced by DecodeCommit(), DecodeHeapOp(), and SnapBuildProcessNewCid().

2002 {
2003  ReorderBufferTXN *txn;
2004 
2005  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2006 
2007  txn->has_catalog_changes = true;
2008 }
#define NULL
Definition: c.h:226
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void StartupReorderBuffer ( void  )

Definition at line 2639 of file reorderbuffer.c.

References AllocateDir(), dirent::d_name, DEBUG2, ereport, errcode_for_file_access(), errmsg(), FreeDir(), lstat, MAXPGPATH, NULL, PANIC, ReadDir(), ReplicationSlotValidateName(), and unlink().

Referenced by StartupXLOG().

2640 {
2641  DIR *logical_dir;
2642  struct dirent *logical_de;
2643 
2644  DIR *spill_dir;
2645  struct dirent *spill_de;
2646 
2647  logical_dir = AllocateDir("pg_replslot");
2648  while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
2649  {
2650  struct stat statbuf;
2651  char path[MAXPGPATH];
2652 
2653  if (strcmp(logical_de->d_name, ".") == 0 ||
2654  strcmp(logical_de->d_name, "..") == 0)
2655  continue;
2656 
2657  /* if it cannot be a slot, skip the directory */
2658  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
2659  continue;
2660 
2661  /*
2662  * ok, has to be a surviving logical slot, iterate and delete
2663  * everything starting with xid-*
2664  */
2665  sprintf(path, "pg_replslot/%s", logical_de->d_name);
2666 
2667  /* we're only creating directories here, skip if it's not our's */
2668  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2669  continue;
2670 
2671  spill_dir = AllocateDir(path);
2672  while ((spill_de = ReadDir(spill_dir, path)) != NULL)
2673  {
2674  if (strcmp(spill_de->d_name, ".") == 0 ||
2675  strcmp(spill_de->d_name, "..") == 0)
2676  continue;
2677 
2678  /* only look at names that can be ours */
2679  if (strncmp(spill_de->d_name, "xid", 3) == 0)
2680  {
2681  sprintf(path, "pg_replslot/%s/%s", logical_de->d_name,
2682  spill_de->d_name);
2683 
2684  if (unlink(path) != 0)
2685  ereport(PANIC,
2687  errmsg("could not remove file \"%s\": %m",
2688  path)));
2689  }
2690  }
2691  FreeDir(spill_dir);
2692  }
2693  FreeDir(logical_dir);
2694 }
Definition: dirent.h:9
#define PANIC
Definition: elog.h:53
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:172
Definition: dirent.c:25
#define MAXPGPATH
#define DEBUG2
Definition: elog.h:24
int errcode_for_file_access(void)
Definition: elog.c:598
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2284
int unlink(const char *filename)
#define ereport(elevel, rest)
Definition: elog.h:122
#define NULL
Definition: c.h:226
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2350
int errmsg(const char *fmt,...)
Definition: elog.c:797
char d_name[MAX_PATH]
Definition: dirent.h:14
#define lstat(path, sb)
Definition: win32.h:272
int FreeDir(DIR *dir)
Definition: fd.c:2393