PostgreSQL Source Code  git master
reorderbuffer.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/tuptoaster.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/combocid.h"
#include "utils/memdebug.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenodemap.h"
#include "utils/tqual.h"
Include dependency graph for reorderbuffer.c:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTXNByIdEnt
 
struct  ReorderBufferTupleCidKey
 
struct  ReorderBufferTupleCidEnt
 
struct  ReorderBufferIterTXNEntry
 
struct  ReorderBufferIterTXNState
 
struct  ReorderBufferToastEnt
 
struct  ReorderBufferDiskChange
 
struct  RewriteMappingFile
 

Typedefs

typedef struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
 
typedef struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
 
typedef struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
 
typedef struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
 
typedef struct ReorderBufferIterTXNState ReorderBufferIterTXNState
 
typedef struct ReorderBufferToastEnt ReorderBufferToastEnt
 
typedef struct ReorderBufferDiskChange ReorderBufferDiskChange
 
typedef struct RewriteMappingFile RewriteMappingFile
 

Functions

static ReorderBufferTXNReorderBufferGetTXN (ReorderBuffer *rb)
 
static void ReorderBufferReturnTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferTXNReorderBufferTXNByXid (ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
 
static void AssertTXNLsnOrder (ReorderBuffer *rb)
 
static ReorderBufferIterTXNStateReorderBufferIterTXNInit (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferChangeReorderBufferIterTXNNext (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferIterTXNFinish (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferExecuteInvalidations (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferCheckSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeChange (ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
 
static Size ReorderBufferRestoreChanges (ReorderBuffer *rb, ReorderBufferTXN *txn, int *fd, XLogSegNo *segno)
 
static void ReorderBufferRestoreChange (ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
 
static void ReorderBufferRestoreCleanup (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferCleanupSerializedTXNs (const char *slotname)
 
static void ReorderBufferSerializedPath (char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
 
static void ReorderBufferFreeSnap (ReorderBuffer *rb, Snapshot snap)
 
static Snapshot ReorderBufferCopySnap (ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
 
static void ReorderBufferToastInitHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReset (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReplace (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void ReorderBufferToastAppendChunk (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *rb)
 
void ReorderBufferReturnChange (ReorderBuffer *rb, ReorderBufferChange *change)
 
ReorderBufferTupleBufReorderBufferGetTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
static int ReorderBufferIterCompare (Datum a, Datum b, void *arg)
 
static void ReorderBufferCleanupTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferBuildTupleCidHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
static void ReorderBufferSerializeReserve (ReorderBuffer *rb, Size sz)
 
void StartupReorderBuffer (void)
 
static void ApplyLogicalMappingFile (HTAB *tuplecid_data, Oid relid, const char *fname)
 
static bool TransactionIdInArray (TransactionId xid, TransactionId *xip, Size num)
 
static int file_sort_by_lsn (const void *a_p, const void *b_p)
 
static void UpdateLogicalMappings (HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
 
bool ResolveCminCmaxDuringDecoding (HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
 

Variables

static const Size max_changes_in_memory = 4096
 

Typedef Documentation

◆ ReorderBufferDiskChange

◆ ReorderBufferIterTXNEntry

◆ ReorderBufferIterTXNState

◆ ReorderBufferToastEnt

◆ ReorderBufferTupleCidEnt

◆ ReorderBufferTupleCidKey

◆ ReorderBufferTXNByIdEnt

◆ RewriteMappingFile

Function Documentation

◆ ApplyLogicalMappingFile()

static void ApplyLogicalMappingFile ( HTAB tuplecid_data,
Oid  relid,
const char *  fname 
)
static

Definition at line 3069 of file reorderbuffer.c.

References Assert, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), HASH_ENTER, HASH_FIND, hash_search(), InvalidCommandId, ItemPointerCopy, MAXPGPATH, LogicalRewriteMappingData::new_node, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_node, LogicalRewriteMappingData::old_tid, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, ReorderBufferTupleCidKey::relnode, ReorderBufferTupleCidKey::tid, and WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ.

Referenced by UpdateLogicalMappings().

3070 {
3071  char path[MAXPGPATH];
3072  int fd;
3073  int readBytes;
3075 
3076  sprintf(path, "pg_logical/mappings/%s", fname);
3077  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
3078  if (fd < 0)
3079  ereport(ERROR,
3081  errmsg("could not open file \"%s\": %m", path)));
3082 
3083  while (true)
3084  {
3087  ReorderBufferTupleCidEnt *new_ent;
3088  bool found;
3089 
3090  /* be careful about padding */
3091  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
3092 
3093  /* read all mappings till the end of the file */
3095  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
3097 
3098  if (readBytes < 0)
3099  ereport(ERROR,
3101  errmsg("could not read file \"%s\": %m",
3102  path)));
3103  else if (readBytes == 0) /* EOF */
3104  break;
3105  else if (readBytes != sizeof(LogicalRewriteMappingData))
3106  ereport(ERROR,
3108  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
3109  path, readBytes,
3110  (int32) sizeof(LogicalRewriteMappingData))));
3111 
3112  key.relnode = map.old_node;
3113  ItemPointerCopy(&map.old_tid,
3114  &key.tid);
3115 
3116 
3117  ent = (ReorderBufferTupleCidEnt *)
3118  hash_search(tuplecid_data,
3119  (void *) &key,
3120  HASH_FIND,
3121  NULL);
3122 
3123  /* no existing mapping, no need to update */
3124  if (!ent)
3125  continue;
3126 
3127  key.relnode = map.new_node;
3128  ItemPointerCopy(&map.new_tid,
3129  &key.tid);
3130 
3131  new_ent = (ReorderBufferTupleCidEnt *)
3132  hash_search(tuplecid_data,
3133  (void *) &key,
3134  HASH_ENTER,
3135  &found);
3136 
3137  if (found)
3138  {
3139  /*
3140  * Make sure the existing mapping makes sense. We sometime update
3141  * old records that did not yet have a cmax (e.g. pg_class' own
3142  * entry while rewriting it) during rewrites, so allow that.
3143  */
3144  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
3145  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
3146  }
3147  else
3148  {
3149  /* update mapping */
3150  new_ent->cmin = ent->cmin;
3151  new_ent->cmax = ent->cmax;
3152  new_ent->combocid = ent->combocid;
3153  }
3154  }
3155 }
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1080
signed int int32
Definition: c.h:313
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2386
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:598
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1260
#define ereport(elevel, rest)
Definition: elog.h:122
#define InvalidCommandId
Definition: c.h:491
ItemPointerData new_tid
Definition: rewriteheap.h:40
#define Assert(condition)
Definition: c.h:699
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1236
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define read(a, b, c)
Definition: win32.h:13
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:161
ItemPointerData old_tid
Definition: rewriteheap.h:39

◆ AssertTXNLsnOrder()

static void AssertTXNLsnOrder ( ReorderBuffer rb)
static

Definition at line 621 of file reorderbuffer.c.

References Assert, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, and ReorderBuffer::toplevel_by_lsn.

Referenced by ReorderBufferGetOldestTXN(), and ReorderBufferTXNByXid().

622 {
623 #ifdef USE_ASSERT_CHECKING
624  dlist_iter iter;
625  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
626 
627  dlist_foreach(iter, &rb->toplevel_by_lsn)
628  {
629  ReorderBufferTXN *cur_txn;
630 
631  cur_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
632  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
633 
634  if (cur_txn->end_lsn != InvalidXLogRecPtr)
635  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
636 
637  if (prev_first_lsn != InvalidXLogRecPtr)
638  Assert(prev_first_lsn < cur_txn->first_lsn);
639 
640  Assert(!cur_txn->is_known_as_subxact);
641  prev_first_lsn = cur_txn->first_lsn;
642  }
643 #endif
644 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head toplevel_by_lsn
dlist_node * cur
Definition: ilist.h:161
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:699
XLogRecPtr end_lsn

◆ file_sort_by_lsn()

static int file_sort_by_lsn ( const void *  a_p,
const void *  b_p 
)
static

Definition at line 3172 of file reorderbuffer.c.

References RewriteMappingFile::lsn.

Referenced by UpdateLogicalMappings().

3173 {
3174  RewriteMappingFile *a = *(RewriteMappingFile **) a_p;
3175  RewriteMappingFile *b = *(RewriteMappingFile **) b_p;
3176 
3177  if (a->lsn < b->lsn)
3178  return -1;
3179  else if (a->lsn > b->lsn)
3180  return 1;
3181  return 0;
3182 }

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1692 of file reorderbuffer.c.

References ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferCleanupTXN(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeAbort().

1693 {
1694  ReorderBufferTXN *txn;
1695 
1696  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1697  false);
1698 
1699  /* unknown, nothing to remove */
1700  if (txn == NULL)
1701  return;
1702 
1703  /* cosmetic... */
1704  txn->final_lsn = lsn;
1705 
1706  /* remove potential on-disk data, and deallocate */
1707  ReorderBufferCleanupTXN(rb, txn);
1708 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)

Definition at line 1718 of file reorderbuffer.c.

References ReorderBufferTXN::changes, dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, dlist_tail_element, elog, ReorderBufferTXN::final_lsn, ReorderBufferChange::lsn, ReorderBufferCleanupTXN(), ReorderBufferTXN::serialized, ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::xid.

Referenced by DecodeStandbyOp().

1719 {
1720  dlist_mutable_iter it;
1721 
1722  /*
1723  * Iterate through all (potential) toplevel TXNs and abort all that are
1724  * older than what possibly can be running. Once we've found the first
1725  * that is alive we stop, there might be some that acquired an xid earlier
1726  * but started writing later, but it's unlikely and they will be cleaned
1727  * up in a later call to this function.
1728  */
1730  {
1731  ReorderBufferTXN *txn;
1732 
1733  txn = dlist_container(ReorderBufferTXN, node, it.cur);
1734 
1735  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1736  {
1737  /*
1738  * We set final_lsn on a transaction when we decode its commit or
1739  * abort record, but we never see those records for crashed
1740  * transactions. To ensure cleanup of these transactions, set
1741  * final_lsn to that of their last change; this causes
1742  * ReorderBufferRestoreCleanup to do the right thing.
1743  */
1744  if (txn->serialized && txn->final_lsn == 0)
1745  {
1746  ReorderBufferChange *last =
1748 
1749  txn->final_lsn = last->lsn;
1750  }
1751 
1752  elog(DEBUG2, "aborting old transaction %u", txn->xid);
1753 
1754  /* remove potential on-disk data, and deallocate this tx */
1755  ReorderBufferCleanupTXN(rb, txn);
1756  }
1757  else
1758  return;
1759  }
1760 }
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:496
dlist_head changes
#define DEBUG2
Definition: elog.h:24
XLogRecPtr final_lsn
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
dlist_head toplevel_by_lsn
TransactionId xid
#define elog
Definition: elog.h:219

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 1945 of file reorderbuffer.c.

References Assert, ReorderBuffer::context, elog, ERROR, ReorderBufferTXN::invalidations, MemoryContextAlloc(), ReorderBufferTXN::ninvalidations, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

1948 {
1949  ReorderBufferTXN *txn;
1950 
1951  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1952 
1953  if (txn->ninvalidations != 0)
1954  elog(ERROR, "only ever add one set of invalidations");
1955 
1956  Assert(nmsgs > 0);
1957 
1958  txn->ninvalidations = nmsgs;
1961  sizeof(SharedInvalidationMessage) * nmsgs);
1962  memcpy(txn->invalidations, msgs,
1963  sizeof(SharedInvalidationMessage) * nmsgs);
1964 }
#define ERROR
Definition: elog.h:43
MemoryContext context
#define Assert(condition)
Definition: c.h:699
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:771
#define elog
Definition: elog.h:219

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

Definition at line 1901 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::command_id, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, ReorderBufferGetChange(), and ReorderBufferQueueChange().

Referenced by SnapBuildProcessNewCid().

1903 {
1905 
1906  change->data.command_id = cid;
1908 
1909  ReorderBufferQueueChange(rb, xid, lsn, change);
1910 }
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:78
union ReorderBufferChange::@102 data
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileNode  node,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 1917 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessNewCid().

1921 {
1923  ReorderBufferTXN *txn;
1924 
1925  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1926 
1927  change->data.tuplecid.node = node;
1928  change->data.tuplecid.tid = tid;
1929  change->data.tuplecid.cmin = cmin;
1930  change->data.tuplecid.cmax = cmax;
1931  change->data.tuplecid.combocid = combocid;
1932  change->lsn = lsn;
1934 
1935  dlist_push_tail(&txn->tuplecids, &change->node);
1936  txn->ntuplecids++;
1937 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:78
union ReorderBufferChange::@102 data
struct ReorderBufferChange::@102::@106 tuplecid
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
dlist_head tuplecids

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 1861 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, ReorderBufferGetChange(), ReorderBufferQueueChange(), and ReorderBufferChange::snapshot.

Referenced by SnapBuildDistributeNewCatalogSnapshot().

1863 {
1865 
1866  change->data.snapshot = snap;
1868 
1869  ReorderBufferQueueChange(rb, xid, lsn, change);
1870 }
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:78
union ReorderBufferChange::@102 data
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)

◆ ReorderBufferAllocate()

ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 224 of file reorderbuffer.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, buffer, ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, ReorderBufferCleanupSerializedTXNs(), SLAB_DEFAULT_BLOCK_SIZE, SLAB_LARGE_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::toplevel_by_lsn, ReorderBuffer::tup_context, and ReorderBuffer::txn_context.

Referenced by StartupDecodingContext().

225 {
227  HASHCTL hash_ctl;
228  MemoryContext new_ctx;
229 
230  Assert(MyReplicationSlot != NULL);
231 
232  /* allocate memory in own context, to have better accountability */
234  "ReorderBuffer",
236 
237  buffer =
238  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
239 
240  memset(&hash_ctl, 0, sizeof(hash_ctl));
241 
242  buffer->context = new_ctx;
243 
244  buffer->change_context = SlabContextCreate(new_ctx,
245  "Change",
247  sizeof(ReorderBufferChange));
248 
249  buffer->txn_context = SlabContextCreate(new_ctx,
250  "TXN",
252  sizeof(ReorderBufferTXN));
253 
254  buffer->tup_context = GenerationContextCreate(new_ctx,
255  "Tuples",
257 
258  hash_ctl.keysize = sizeof(TransactionId);
259  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
260  hash_ctl.hcxt = buffer->context;
261 
262  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
264 
266  buffer->by_txn_last_txn = NULL;
267 
268  buffer->outbuf = NULL;
269  buffer->outbufsize = 0;
270 
272 
273  dlist_init(&buffer->toplevel_by_lsn);
274 
275  /*
276  * Ensure there's no stale data from prior uses of this slot, in case some
277  * prior exit avoided calling ReorderBufferFree. Failure to do this can
278  * produce duplicated txns, and it's very cheap if there's nothing there.
279  */
281 
282  return buffer;
283 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
uint32 TransactionId
Definition: c.h:474
MemoryContext hcxt
Definition: hsearch.h:78
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
Size entrysize
Definition: hsearch.h:73
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:188
ReplicationSlotPersistentData data
Definition: slot.h:120
MemoryContext change_context
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:222
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
#define InvalidTransactionId
Definition: transam.h:31
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define AllocSetContextCreate(parent, name, allocparams)
Definition: memutils.h:170
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size blockSize)
Definition: generation.c:212
MemoryContext context
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
ReorderBufferTXN * by_txn_last_txn
dlist_head toplevel_by_lsn
Size keysize
Definition: hsearch.h:72
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:699
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:215
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:221
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:771
#define NameStr(name)
Definition: c.h:576
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
MemoryContext tup_context
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext txn_context

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 670 of file reorderbuffer.c.

References Assert, dlist_delete(), dlist_push_tail(), elog, ERROR, ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeXactOp().

672 {
673  ReorderBufferTXN *txn;
674  ReorderBufferTXN *subtxn;
675  bool new_top;
676  bool new_sub;
677 
678  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
679  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
680 
681  if (new_sub)
682  {
683  /*
684  * we assign subtransactions to top level transaction even if we don't
685  * have data for it yet, assignment records frequently reference xids
686  * that have not yet produced any records. Knowing those aren't top
687  * level xids allows us to make processing cheaper in some places.
688  */
689  dlist_push_tail(&txn->subtxns, &subtxn->node);
690  txn->nsubtxns++;
691  }
692  else if (!subtxn->is_known_as_subxact)
693  {
694  subtxn->is_known_as_subxact = true;
695  Assert(subtxn->nsubtxns == 0);
696 
697  /* remove from lsn order list of top-level transactions */
698  dlist_delete(&subtxn->node);
699 
700  /* add to toplevel transaction */
701  dlist_push_tail(&txn->subtxns, &subtxn->node);
702  txn->nsubtxns++;
703  }
704  else if (new_top)
705  {
706  elog(ERROR, "existing subxact assigned to unknown toplevel xact");
707  }
708 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
#define ERROR
Definition: elog.h:43
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define Assert(condition)
Definition: c.h:699
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define elog
Definition: elog.h:219

◆ ReorderBufferBuildTupleCidHash()

static void ReorderBufferBuildTupleCidHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1127 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, dlist_container, dlist_foreach, dlist_is_empty(), HASHCTL::entrysize, ReorderBufferTXN::has_catalog_changes, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FIND, hash_search(), HASHCTL::hcxt, InvalidCommandId, ItemPointerCopy, HASHCTL::keysize, ReorderBufferTXN::ntuplecids, ReorderBufferTupleCidKey::relnode, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferTupleCidKey::tid, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::tuplecids.

Referenced by ReorderBufferCommit().

1128 {
1129  dlist_iter iter;
1130  HASHCTL hash_ctl;
1131 
1132  if (!txn->has_catalog_changes || dlist_is_empty(&txn->tuplecids))
1133  return;
1134 
1135  memset(&hash_ctl, 0, sizeof(hash_ctl));
1136 
1137  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1138  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1139  hash_ctl.hcxt = rb->context;
1140 
1141  /*
1142  * create the hash with the exact number of to-be-stored tuplecids from
1143  * the start
1144  */
1145  txn->tuplecid_hash =
1146  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1148 
1149  dlist_foreach(iter, &txn->tuplecids)
1150  {
1153  bool found;
1154  ReorderBufferChange *change;
1155 
1156  change = dlist_container(ReorderBufferChange, node, iter.cur);
1157 
1159 
1160  /* be careful about padding */
1161  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1162 
1163  key.relnode = change->data.tuplecid.node;
1164 
1165  ItemPointerCopy(&change->data.tuplecid.tid,
1166  &key.tid);
1167 
1168  ent = (ReorderBufferTupleCidEnt *)
1170  (void *) &key,
1172  &found);
1173  if (!found)
1174  {
1175  ent->cmin = change->data.tuplecid.cmin;
1176  ent->cmax = change->data.tuplecid.cmax;
1177  ent->combocid = change->data.tuplecid.combocid;
1178  }
1179  else
1180  {
1181  Assert(ent->cmin == change->data.tuplecid.cmin);
1182  Assert(ent->cmax == InvalidCommandId ||
1183  ent->cmax == change->data.tuplecid.cmax);
1184 
1185  /*
1186  * if the tuple got valid in this transaction and now got deleted
1187  * we already have a valid cmin stored. The cmax will be
1188  * InvalidCommandId though.
1189  */
1190  ent->cmax = change->data.tuplecid.cmax;
1191  }
1192  }
1193 }
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
Size entrysize
Definition: hsearch.h:73
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:78
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext context
#define InvalidCommandId
Definition: c.h:491
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
union ReorderBufferChange::@102 data
struct ReorderBufferChange::@102::@106 tuplecid
Size keysize
Definition: hsearch.h:72
dlist_node * cur
Definition: ilist.h:161
#define Assert(condition)
Definition: c.h:699
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
dlist_head tuplecids
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:161

◆ ReorderBufferCheckSerializeTXN()

static void ReorderBufferCheckSerializeTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2062 of file reorderbuffer.c.

References Assert, max_changes_in_memory, ReorderBufferTXN::nentries_mem, and ReorderBufferSerializeTXN().

Referenced by ReorderBufferQueueChange().

2063 {
2064  /*
2065  * TODO: improve accounting so we cheaply can take subtransactions into
2066  * account here.
2067  */
2068  if (txn->nentries_mem >= max_changes_in_memory)
2069  {
2070  ReorderBufferSerializeTXN(rb, txn);
2071  Assert(txn->nentries_mem == 0);
2072  }
2073 }
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:699
static const Size max_changes_in_memory

◆ ReorderBufferCleanupSerializedTXNs()

static void ReorderBufferCleanupSerializedTXNs ( const char *  slotname)
static

Definition at line 2616 of file reorderbuffer.c.

References AllocateDir(), dirent::d_name, ereport, errcode_for_file_access(), errmsg(), ERROR, FreeDir(), INFO, lstat, MAXPGPATH, ReadDirExtended(), S_ISDIR, snprintf(), and stat.

Referenced by ReorderBufferAllocate(), ReorderBufferFree(), and StartupReorderBuffer().

2617 {
2618  DIR *spill_dir;
2619  struct dirent *spill_de;
2620  struct stat statbuf;
2621  char path[MAXPGPATH * 2 + 12];
2622 
2623  sprintf(path, "pg_replslot/%s", slotname);
2624 
2625  /* we're only handling directories here, skip if it's not ours */
2626  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2627  return;
2628 
2629  spill_dir = AllocateDir(path);
2630  while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
2631  {
2632  /* only look at names that can be ours */
2633  if (strncmp(spill_de->d_name, "xid", 3) == 0)
2634  {
2635  snprintf(path, sizeof(path),
2636  "pg_replslot/%s/%s", slotname,
2637  spill_de->d_name);
2638 
2639  if (unlink(path) != 0)
2640  ereport(ERROR,
2642  errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/*.xid: %m",
2643  path, slotname)));
2644  }
2645  }
2646  FreeDir(spill_dir);
2647 }
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2671
#define INFO
Definition: elog.h:33
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
Definition: dirent.h:9
Definition: dirent.c:25
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:598
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2590
#define ereport(elevel, rest)
Definition: elog.h:122
#define stat(a, b)
Definition: win32_port.h:266
#define S_ISDIR(m)
Definition: win32_port.h:307
#define lstat(path, sb)
Definition: win32_port.h:255
int errmsg(const char *fmt,...)
Definition: elog.c:797
char d_name[MAX_PATH]
Definition: dirent.h:14
int FreeDir(DIR *dir)
Definition: fd.c:2708

◆ ReorderBufferCleanupTXN()

static void ReorderBufferCleanupTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1044 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBuffer::by_txn, ReorderBufferTXN::changes, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, HASH_REMOVE, hash_search(), InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferRestoreCleanup(), ReorderBufferReturnChange(), ReorderBufferReturnTXN(), ReorderBufferTXN::serialized, SnapBuildSnapDecRefcount(), ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAbortOld(), ReorderBufferCommit(), and ReorderBufferForget().

1045 {
1046  bool found;
1047  dlist_mutable_iter iter;
1048 
1049  /* cleanup subtransactions & their changes */
1050  dlist_foreach_modify(iter, &txn->subtxns)
1051  {
1052  ReorderBufferTXN *subtxn;
1053 
1054  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1055 
1056  /*
1057  * Subtransactions are always associated to the toplevel TXN, even if
1058  * they originally were happening inside another subtxn, so we won't
1059  * ever recurse more than one level deep here.
1060  */
1061  Assert(subtxn->is_known_as_subxact);
1062  Assert(subtxn->nsubtxns == 0);
1063 
1064  ReorderBufferCleanupTXN(rb, subtxn);
1065  }
1066 
1067  /* cleanup changes in the toplevel txn */
1068  dlist_foreach_modify(iter, &txn->changes)
1069  {
1070  ReorderBufferChange *change;
1071 
1072  change = dlist_container(ReorderBufferChange, node, iter.cur);
1073 
1074  ReorderBufferReturnChange(rb, change);
1075  }
1076 
1077  /*
1078  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1079  * They are always stored in the toplevel transaction.
1080  */
1081  dlist_foreach_modify(iter, &txn->tuplecids)
1082  {
1083  ReorderBufferChange *change;
1084 
1085  change = dlist_container(ReorderBufferChange, node, iter.cur);
1087  ReorderBufferReturnChange(rb, change);
1088  }
1089 
1090  if (txn->base_snapshot != NULL)
1091  {
1093  txn->base_snapshot = NULL;
1095  }
1096 
1097  /*
1098  * Remove TXN from its containing list.
1099  *
1100  * Note: if txn->is_known_as_subxact, we are deleting the TXN from its
1101  * parent's list of known subxacts; this leaves the parent's nsubxacts
1102  * count too high, but we don't care. Otherwise, we are deleting the TXN
1103  * from the LSN-ordered list of toplevel TXNs.
1104  */
1105  dlist_delete(&txn->node);
1106 
1107  /* now remove reference from buffer */
1108  hash_search(rb->by_txn,
1109  (void *) &txn->xid,
1110  HASH_REMOVE,
1111  &found);
1112  Assert(found);
1113 
1114  /* remove entries spilled to disk */
1115  if (txn->serialized)
1116  ReorderBufferRestoreCleanup(rb, txn);
1117 
1118  /* deallocate */
1119  ReorderBufferReturnTXN(rb, txn);
1120 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
XLogRecPtr base_snapshot_lsn
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:78
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
#define Assert(condition)
Definition: c.h:699
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:434
dlist_head subtxns
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
dlist_head tuplecids

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 1282 of file reorderbuffer.c.

References AbortCurrentTransaction(), ReorderBufferChange::action, ReorderBuffer::apply_change, ReorderBuffer::apply_truncate, Assert, ReorderBufferTXN::base_snapshot, ReorderBuffer::begin, BeginInternalSubTransaction(), ReorderBufferChange::command_id, ReorderBuffer::commit, ReorderBufferTXN::commit_time, SnapshotData::copied, SnapshotData::curcid, ReorderBufferChange::data, dlist_delete(), elog, ReorderBufferTXN::end_lsn, ERROR, ReorderBufferTXN::final_lsn, FirstCommandId, GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), i, InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, ReorderBuffer::message, ReorderBufferChange::msg, ReorderBufferTXN::ninvalidations, ReorderBufferChange::node, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBuffer::output_rewrites, palloc0(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelidByRelfilenode(), relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferReturnChange(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTXNByXid(), RollbackAndReleaseCurrentSubTransaction(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, StartTransactionCommand(), TeardownHistoricSnapshot(), ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

1286 {
1287  ReorderBufferTXN *txn;
1288  volatile Snapshot snapshot_now;
1289  volatile CommandId command_id = FirstCommandId;
1290  bool using_subtxn;
1291  ReorderBufferIterTXNState *volatile iterstate = NULL;
1292 
1293  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1294  false);
1295 
1296  /* unknown transaction, nothing to replay */
1297  if (txn == NULL)
1298  return;
1299 
1300  txn->final_lsn = commit_lsn;
1301  txn->end_lsn = end_lsn;
1302  txn->commit_time = commit_time;
1303  txn->origin_id = origin_id;
1304  txn->origin_lsn = origin_lsn;
1305 
1306  /*
1307  * If this transaction didn't have any real changes in our database, it's
1308  * OK not to have a snapshot. Note that ReorderBufferCommitChild will have
1309  * transferred its snapshot to this transaction if it had one and the
1310  * toplevel tx didn't.
1311  */
1312  if (txn->base_snapshot == NULL)
1313  {
1314  Assert(txn->ninvalidations == 0);
1315  ReorderBufferCleanupTXN(rb, txn);
1316  return;
1317  }
1318 
1319  snapshot_now = txn->base_snapshot;
1320 
1321  /* build data to be able to lookup the CommandIds of catalog tuples */
1323 
1324  /* setup the initial snapshot */
1325  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1326 
1327  /*
1328  * Decoding needs access to syscaches et al., which in turn use
1329  * heavyweight locks and such. Thus we need to have enough state around to
1330  * keep track of those. The easiest way is to simply use a transaction
1331  * internally. That also allows us to easily enforce that nothing writes
1332  * to the database by checking for xid assignments.
1333  *
1334  * When we're called via the SQL SRF there's already a transaction
1335  * started, so start an explicit subtransaction there.
1336  */
1337  using_subtxn = IsTransactionOrTransactionBlock();
1338 
1339  PG_TRY();
1340  {
1341  ReorderBufferChange *change;
1342  ReorderBufferChange *specinsert = NULL;
1343 
1344  if (using_subtxn)
1345  BeginInternalSubTransaction("replay");
1346  else
1348 
1349  rb->begin(rb, txn);
1350 
1351  iterstate = ReorderBufferIterTXNInit(rb, txn);
1352  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1353  {
1354  Relation relation = NULL;
1355  Oid reloid;
1356 
1357  switch (change->action)
1358  {
1360 
1361  /*
1362  * Confirmation for speculative insertion arrived. Simply
1363  * use as a normal record. It'll be cleaned up at the end
1364  * of INSERT processing.
1365  */
1366  Assert(specinsert->data.tp.oldtuple == NULL);
1367  change = specinsert;
1369 
1370  /* intentionally fall through */
1374  Assert(snapshot_now);
1375 
1376  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1377  change->data.tp.relnode.relNode);
1378 
1379  /*
1380  * Catalog tuple without data, emitted while catalog was
1381  * in the process of being rewritten.
1382  */
1383  if (reloid == InvalidOid &&
1384  change->data.tp.newtuple == NULL &&
1385  change->data.tp.oldtuple == NULL)
1386  goto change_done;
1387  else if (reloid == InvalidOid)
1388  elog(ERROR, "could not map filenode \"%s\" to relation OID",
1389  relpathperm(change->data.tp.relnode,
1390  MAIN_FORKNUM));
1391 
1392  relation = RelationIdGetRelation(reloid);
1393 
1394  if (relation == NULL)
1395  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1396  reloid,
1397  relpathperm(change->data.tp.relnode,
1398  MAIN_FORKNUM));
1399 
1400  if (!RelationIsLogicallyLogged(relation))
1401  goto change_done;
1402 
1403  /*
1404  * Ignore temporary heaps created during DDL unless the
1405  * plugin has asked for them.
1406  */
1407  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
1408  goto change_done;
1409 
1410  /*
1411  * For now ignore sequence changes entirely. Most of the
1412  * time they don't log changes using records we
1413  * understand, so it doesn't make sense to handle the few
1414  * cases we do.
1415  */
1416  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1417  goto change_done;
1418 
1419  /* user-triggered change */
1420  if (!IsToastRelation(relation))
1421  {
1422  ReorderBufferToastReplace(rb, txn, relation, change);
1423  rb->apply_change(rb, txn, relation, change);
1424 
1425  /*
1426  * Only clear reassembled toast chunks if we're sure
1427  * they're not required anymore. The creator of the
1428  * tuple tells us.
1429  */
1430  if (change->data.tp.clear_toast_afterwards)
1431  ReorderBufferToastReset(rb, txn);
1432  }
1433  /* we're not interested in toast deletions */
1434  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1435  {
1436  /*
1437  * Need to reassemble the full toasted Datum in
1438  * memory, to ensure the chunks don't get reused till
1439  * we're done remove it from the list of this
1440  * transaction's changes. Otherwise it will get
1441  * freed/reused while restoring spooled data from
1442  * disk.
1443  */
1444  dlist_delete(&change->node);
1445  ReorderBufferToastAppendChunk(rb, txn, relation,
1446  change);
1447  }
1448 
1449  change_done:
1450 
1451  /*
1452  * Either speculative insertion was confirmed, or it was
1453  * unsuccessful and the record isn't needed anymore.
1454  */
1455  if (specinsert != NULL)
1456  {
1457  ReorderBufferReturnChange(rb, specinsert);
1458  specinsert = NULL;
1459  }
1460 
1461  if (relation != NULL)
1462  {
1463  RelationClose(relation);
1464  relation = NULL;
1465  }
1466  break;
1467 
1469 
1470  /*
1471  * Speculative insertions are dealt with by delaying the
1472  * processing of the insert until the confirmation record
1473  * arrives. For that we simply unlink the record from the
1474  * chain, so it does not get freed/reused while restoring
1475  * spooled data from disk.
1476  *
1477  * This is safe in the face of concurrent catalog changes
1478  * because the relevant relation can't be changed between
1479  * speculative insertion and confirmation due to
1480  * CheckTableNotInUse() and locking.
1481  */
1482 
1483  /* clear out a pending (and thus failed) speculation */
1484  if (specinsert != NULL)
1485  {
1486  ReorderBufferReturnChange(rb, specinsert);
1487  specinsert = NULL;
1488  }
1489 
1490  /* and memorize the pending insertion */
1491  dlist_delete(&change->node);
1492  specinsert = change;
1493  break;
1494 
1496  {
1497  int i;
1498  int nrelids = change->data.truncate.nrelids;
1499  int nrelations = 0;
1500  Relation *relations;
1501 
1502  relations = palloc0(nrelids * sizeof(Relation));
1503  for (i = 0; i < nrelids; i++)
1504  {
1505  Oid relid = change->data.truncate.relids[i];
1506  Relation relation;
1507 
1508  relation = RelationIdGetRelation(relid);
1509 
1510  if (relation == NULL)
1511  elog(ERROR, "could not open relation with OID %u", relid);
1512 
1513  if (!RelationIsLogicallyLogged(relation))
1514  continue;
1515 
1516  relations[nrelations++] = relation;
1517  }
1518 
1519  rb->apply_truncate(rb, txn, nrelations, relations, change);
1520 
1521  for (i = 0; i < nrelations; i++)
1522  RelationClose(relations[i]);
1523 
1524  break;
1525  }
1526 
1528  rb->message(rb, txn, change->lsn, true,
1529  change->data.msg.prefix,
1530  change->data.msg.message_size,
1531  change->data.msg.message);
1532  break;
1533 
1535  /* get rid of the old */
1536  TeardownHistoricSnapshot(false);
1537 
1538  if (snapshot_now->copied)
1539  {
1540  ReorderBufferFreeSnap(rb, snapshot_now);
1541  snapshot_now =
1542  ReorderBufferCopySnap(rb, change->data.snapshot,
1543  txn, command_id);
1544  }
1545 
1546  /*
1547  * Restored from disk, need to be careful not to double
1548  * free. We could introduce refcounting for that, but for
1549  * now this seems infrequent enough not to care.
1550  */
1551  else if (change->data.snapshot->copied)
1552  {
1553  snapshot_now =
1554  ReorderBufferCopySnap(rb, change->data.snapshot,
1555  txn, command_id);
1556  }
1557  else
1558  {
1559  snapshot_now = change->data.snapshot;
1560  }
1561 
1562 
1563  /* and continue with the new one */
1564  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1565  break;
1566 
1568  Assert(change->data.command_id != InvalidCommandId);
1569 
1570  if (command_id < change->data.command_id)
1571  {
1572  command_id = change->data.command_id;
1573 
1574  if (!snapshot_now->copied)
1575  {
1576  /* we don't use the global one anymore */
1577  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1578  txn, command_id);
1579  }
1580 
1581  snapshot_now->curcid = command_id;
1582 
1583  TeardownHistoricSnapshot(false);
1584  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1585 
1586  /*
1587  * Every time the CommandId is incremented, we could
1588  * see new catalog contents, so execute all
1589  * invalidations.
1590  */
1592  }
1593 
1594  break;
1595 
1597  elog(ERROR, "tuplecid value in changequeue");
1598  break;
1599  }
1600  }
1601 
1602  /*
1603  * There's a speculative insertion remaining, just clean in up, it
1604  * can't have been successful, otherwise we'd gotten a confirmation
1605  * record.
1606  */
1607  if (specinsert)
1608  {
1609  ReorderBufferReturnChange(rb, specinsert);
1610  specinsert = NULL;
1611  }
1612 
1613  /* clean up the iterator */
1614  ReorderBufferIterTXNFinish(rb, iterstate);
1615  iterstate = NULL;
1616 
1617  /* call commit callback */
1618  rb->commit(rb, txn, commit_lsn);
1619 
1620  /* this is just a sanity check against bad output plugin behaviour */
1622  elog(ERROR, "output plugin used XID %u",
1624 
1625  /* cleanup */
1626  TeardownHistoricSnapshot(false);
1627 
1628  /*
1629  * Aborting the current (sub-)transaction as a whole has the right
1630  * semantics. We want all locks acquired in here to be released, not
1631  * reassigned to the parent and we do not want any database access
1632  * have persistent effects.
1633  */
1635 
1636  /* make sure there's no cache pollution */
1638 
1639  if (using_subtxn)
1641 
1642  if (snapshot_now->copied)
1643  ReorderBufferFreeSnap(rb, snapshot_now);
1644 
1645  /* remove potential on-disk data, and deallocate */
1646  ReorderBufferCleanupTXN(rb, txn);
1647  }
1648  PG_CATCH();
1649  {
1650  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1651  if (iterstate)
1652  ReorderBufferIterTXNFinish(rb, iterstate);
1653 
1655 
1656  /*
1657  * Force cache invalidation to happen outside of a valid transaction
1658  * to prevent catalog access as we just caught an error.
1659  */
1661 
1662  /* make sure there's no cache pollution */
1664 
1665  if (using_subtxn)
1667 
1668  if (snapshot_now->copied)
1669  ReorderBufferFreeSnap(rb, snapshot_now);
1670 
1671  /* remove potential on-disk data, and deallocate */
1672  ReorderBufferCleanupTXN(rb, txn);
1673 
1674  PG_RE_THROW();
1675  }
1676  PG_END_TRY();
1677 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
uint32 CommandId
Definition: c.h:488
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
TimestampTz commit_time
void AbortCurrentTransaction(void)
Definition: xact.c:2984
bool IsToastRelation(Relation relation)
Definition: catalog.c:136
#define relpathperm(rnode, forknum)
Definition: relpath.h:83
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
ReorderBufferApplyChangeCB apply_change
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
bool copied
Definition: snapshot.h:96
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4440
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2017
ReorderBufferCommitCB commit
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:580
Form_pg_class rd_rel
Definition: rel.h:84
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:78
XLogRecPtr origin_lsn
#define FirstCommandId
Definition: c.h:490
struct ReorderBufferChange::@102::@103 tp
#define ERROR
Definition: elog.h:43
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:417
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4251
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:434
#define InvalidTransactionId
Definition: transam.h:31
XLogRecPtr final_lsn
void RelationClose(Relation relation)
Definition: relcache.c:1996
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
ReorderBufferMessageCB message
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
void * palloc0(Size size)
Definition: mcxt.c:955
#define InvalidCommandId
Definition: c.h:491
union ReorderBufferChange::@102 data
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
#define InvalidOid
Definition: postgres_ext.h:36
CommandId curcid
Definition: snapshot.h:98
struct ReorderBufferChange::@102::@105 msg
static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define PG_CATCH()
Definition: elog.h:293
#define Assert(condition)
Definition: c.h:699
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
XLogRecPtr end_lsn
void StartTransactionCommand(void)
Definition: xact.c:2673
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4146
#define PG_RE_THROW()
Definition: elog.h:314
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
ReorderBufferApplyTruncateCB apply_truncate
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2001
int i
#define elog
Definition: elog.h:219
ReorderBufferBeginCB begin
#define PG_TRY()
Definition: elog.h:284
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
static ReorderBufferIterTXNState * ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:1906
#define PG_END_TRY()
Definition: elog.h:300
struct ReorderBufferChange::@102::@104 truncate

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 715 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_delete(), dlist_push_tail(), elog, ReorderBufferTXN::end_lsn, ERROR, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

718 {
719  ReorderBufferTXN *txn;
720  ReorderBufferTXN *subtxn;
721 
722  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
723  InvalidXLogRecPtr, false);
724 
725  /*
726  * No need to do anything if that subtxn didn't contain any changes
727  */
728  if (!subtxn)
729  return;
730 
731  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true);
732 
733  if (txn == NULL)
734  elog(ERROR, "subxact logged without previous toplevel record");
735 
736  /*
737  * Pass our base snapshot to the parent transaction if it doesn't have
738  * one, or ours is older. That can happen if there are no changes in the
739  * toplevel transaction but in one of the child transactions. This allows
740  * the parent to simply use its base snapshot initially.
741  */
742  if (subtxn->base_snapshot != NULL &&
743  (txn->base_snapshot == NULL ||
744  txn->base_snapshot_lsn > subtxn->base_snapshot_lsn))
745  {
746  txn->base_snapshot = subtxn->base_snapshot;
747  txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
748  subtxn->base_snapshot = NULL;
750  }
751 
752  subtxn->final_lsn = commit_lsn;
753  subtxn->end_lsn = end_lsn;
754 
755  if (!subtxn->is_known_as_subxact)
756  {
757  subtxn->is_known_as_subxact = true;
758  Assert(subtxn->nsubtxns == 0);
759 
760  /* remove from lsn order list of top-level transactions */
761  dlist_delete(&subtxn->node);
762 
763  /* add to subtransaction list */
764  dlist_push_tail(&txn->subtxns, &subtxn->node);
765  txn->nsubtxns++;
766  }
767 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
XLogRecPtr base_snapshot_lsn
#define ERROR
Definition: elog.h:43
XLogRecPtr final_lsn
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define Assert(condition)
Definition: c.h:699
XLogRecPtr end_lsn
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define elog
Definition: elog.h:219

◆ ReorderBufferCopySnap()

static Snapshot ReorderBufferCopySnap ( ReorderBuffer rb,
Snapshot  orig_snap,
ReorderBufferTXN txn,
CommandId  cid 
)
static

Definition at line 1201 of file reorderbuffer.c.

References SnapshotData::active_count, ReorderBuffer::context, SnapshotData::copied, dlist_iter::cur, SnapshotData::curcid, dlist_container, dlist_foreach, i, MemoryContextAllocZero(), ReorderBufferTXN::nsubtxns, qsort, SnapshotData::regd_count, ReorderBufferTXN::subtxns, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::xcnt, ReorderBufferTXN::xid, xidComparator(), and SnapshotData::xip.

Referenced by ReorderBufferCommit().

1203 {
1204  Snapshot snap;
1205  dlist_iter iter;
1206  int i = 0;
1207  Size size;
1208 
1209  size = sizeof(SnapshotData) +
1210  sizeof(TransactionId) * orig_snap->xcnt +
1211  sizeof(TransactionId) * (txn->nsubtxns + 1);
1212 
1213  snap = MemoryContextAllocZero(rb->context, size);
1214  memcpy(snap, orig_snap, sizeof(SnapshotData));
1215 
1216  snap->copied = true;
1217  snap->active_count = 1; /* mark as active so nobody frees it */
1218  snap->regd_count = 0;
1219  snap->xip = (TransactionId *) (snap + 1);
1220 
1221  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1222 
1223  /*
1224  * snap->subxip contains all txids that belong to our transaction which we
1225  * need to check via cmin/cmax. That's why we store the toplevel
1226  * transaction in there as well.
1227  */
1228  snap->subxip = snap->xip + snap->xcnt;
1229  snap->subxip[i++] = txn->xid;
1230 
1231  /*
1232  * nsubxcnt isn't decreased when subtransactions abort, so count manually.
1233  * Since it's an upper boundary it is safe to use it for the allocation
1234  * above.
1235  */
1236  snap->subxcnt = 1;
1237 
1238  dlist_foreach(iter, &txn->subtxns)
1239  {
1240  ReorderBufferTXN *sub_txn;
1241 
1242  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1243  snap->subxip[i++] = sub_txn->xid;
1244  snap->subxcnt++;
1245  }
1246 
1247  /* sort so we can bsearch() later */
1248  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1249 
1250  /* store the specified current CommandId */
1251  snap->curcid = cid;
1252 
1253  return snap;
1254 }
uint32 TransactionId
Definition: c.h:474
bool copied
Definition: snapshot.h:96
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
uint32 regd_count
Definition: snapshot.h:110
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
struct SnapshotData SnapshotData
TransactionId * xip
Definition: snapshot.h:79
MemoryContext context
TransactionId xid
dlist_node * cur
Definition: ilist.h:161
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:814
CommandId curcid
Definition: snapshot.h:98
size_t Size
Definition: c.h:433
dlist_head subtxns
uint32 xcnt
Definition: snapshot.h:80
int i
#define qsort(a, b, c, d)
Definition: port.h:421
TransactionId * subxip
Definition: snapshot.h:91
uint32 active_count
Definition: snapshot.h:109
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:138
int32 subxcnt
Definition: snapshot.h:92

◆ ReorderBufferExecuteInvalidations()

static void ReorderBufferExecuteInvalidations ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1971 of file reorderbuffer.c.

References i, ReorderBufferTXN::invalidations, LocalExecuteInvalidationMessage(), and ReorderBufferTXN::ninvalidations.

Referenced by ReorderBufferCommit().

1972 {
1973  int i;
1974 
1975  for (i = 0; i < txn->ninvalidations; i++)
1977 }
SharedInvalidationMessage * invalidations
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:554
int i

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1776 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

1777 {
1778  ReorderBufferTXN *txn;
1779 
1780  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1781  false);
1782 
1783  /* unknown, nothing to forget */
1784  if (txn == NULL)
1785  return;
1786 
1787  /* cosmetic... */
1788  txn->final_lsn = lsn;
1789 
1790  /*
1791  * Process cache invalidation messages if there are any. Even if we're not
1792  * interested in the transaction's contents, it could have manipulated the
1793  * catalog and we need to update the caches according to that.
1794  */
1795  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
1797  txn->invalidations);
1798  else
1799  Assert(txn->ninvalidations == 0);
1800 
1801  /* remove potential on-disk data, and deallocate */
1802  ReorderBufferCleanupTXN(rb, txn);
1803 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:699
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer rb)

Definition at line 289 of file reorderbuffer.c.

References ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

290 {
291  MemoryContext context = rb->context;
292 
293  /*
294  * We free separately allocated data by entirely scrapping reorderbuffer's
295  * memory context.
296  */
297  MemoryContextDelete(context);
298 
299  /* Free disk space used by unconsumed reorder buffers */
301 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
ReplicationSlotPersistentData data
Definition: slot.h:120
MemoryContext context
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NameStr(name)
Definition: c.h:576
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)

◆ ReorderBufferFreeSnap()

static void ReorderBufferFreeSnap ( ReorderBuffer rb,
Snapshot  snap 
)
static

Definition at line 1260 of file reorderbuffer.c.

References SnapshotData::copied, pfree(), and SnapBuildSnapDecRefcount().

Referenced by ReorderBufferCommit(), and ReorderBufferReturnChange().

1261 {
1262  if (snap->copied)
1263  pfree(snap);
1264  else
1266 }
bool copied
Definition: snapshot.h:96
void pfree(void *pointer)
Definition: mcxt.c:1031
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:434

◆ ReorderBufferGetChange()

ReorderBufferChange* ReorderBufferGetChange ( ReorderBuffer rb)

Definition at line 360 of file reorderbuffer.c.

References ReorderBuffer::change_context, and MemoryContextAlloc().

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddNewTupleCids(), ReorderBufferAddSnapshot(), ReorderBufferQueueMessage(), and ReorderBufferRestoreChange().

361 {
362  ReorderBufferChange *change;
363 
364  change = (ReorderBufferChange *)
366 
367  memset(change, 0, sizeof(ReorderBufferChange));
368  return change;
369 }
MemoryContext change_context
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:771

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer rb)

Definition at line 647 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessRunningXacts().

648 {
649  ReorderBufferTXN *txn;
650 
652  return NULL;
653 
654  AssertTXNLsnOrder(rb);
655 
657 
660  return txn;
661 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
dlist_head toplevel_by_lsn
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:699
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289

◆ ReorderBufferGetTupleBuf()

ReorderBufferTupleBuf* ReorderBufferGetTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)

Definition at line 430 of file reorderbuffer.c.

References ReorderBufferTupleBuf::alloc_tuple_size, MemoryContextAlloc(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, HeapTupleData::t_data, ReorderBuffer::tup_context, and ReorderBufferTupleBuf::tuple.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

431 {
432  ReorderBufferTupleBuf *tuple;
433  Size alloc_len;
434 
435  alloc_len = tuple_len + SizeofHeapTupleHeader;
436 
437  tuple = (ReorderBufferTupleBuf *)
439  sizeof(ReorderBufferTupleBuf) +
440  MAXIMUM_ALIGNOF + alloc_len);
441  tuple->alloc_tuple_size = alloc_len;
442  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
443 
444  return tuple;
445 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:183
HeapTupleHeader t_data
Definition: htup.h:68
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:36
HeapTupleData tuple
Definition: reorderbuffer.h:27
size_t Size
Definition: c.h:433
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:771
MemoryContext tup_context

◆ ReorderBufferGetTXN()

static ReorderBufferTXN * ReorderBufferGetTXN ( ReorderBuffer rb)
static

Definition at line 307 of file reorderbuffer.c.

References ReorderBufferTXN::changes, dlist_init(), MemoryContextAlloc(), ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferTXNByIdEnt::txn, and ReorderBuffer::txn_context.

Referenced by ReorderBufferTXNByXid().

308 {
309  ReorderBufferTXN *txn;
310 
311  txn = (ReorderBufferTXN *)
313 
314  memset(txn, 0, sizeof(ReorderBufferTXN));
315 
316  dlist_init(&txn->changes);
317  dlist_init(&txn->tuplecids);
318  dlist_init(&txn->subtxns);
319 
320  return txn;
321 }
dlist_head changes
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
dlist_head subtxns
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:771
MemoryContext txn_context
dlist_head tuplecids

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 1812 of file reorderbuffer.c.

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by DecodeStandbyOp(), and ReorderBufferForget().

1814 {
1815  bool use_subtxn = IsTransactionOrTransactionBlock();
1816  int i;
1817 
1818  if (use_subtxn)
1819  BeginInternalSubTransaction("replay");
1820 
1821  /*
1822  * Force invalidations to happen outside of a valid transaction - that way
1823  * entries will just be marked as invalid without accessing the catalog.
1824  * That's advantageous because we don't need to setup the full state
1825  * necessary for catalog access.
1826  */
1827  if (use_subtxn)
1829 
1830  for (i = 0; i < ninvalidations; i++)
1831  LocalExecuteInvalidationMessage(&invalidations[i]);
1832 
1833  if (use_subtxn)
1835 }
void AbortCurrentTransaction(void)
Definition: xact.c:2984
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4440
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4251
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4146
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:554
int i

◆ ReorderBufferIterCompare()

static int ReorderBufferIterCompare ( Datum  a,
Datum  b,
void *  arg 
)
static

Definition at line 786 of file reorderbuffer.c.

References DatumGetInt32, ReorderBufferIterTXNState::entries, and ReorderBufferIterTXNEntry::lsn.

Referenced by ReorderBufferIterTXNInit().

787 {
789  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
790  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
791 
792  if (pos_a < pos_b)
793  return 1;
794  else if (pos_a == pos_b)
795  return 0;
796  return -1;
797 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
#define DatumGetInt32(X)
Definition: postgres.h:457
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:298
void * arg

◆ ReorderBufferIterTXNFinish()

static void ReorderBufferIterTXNFinish ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1013 of file reorderbuffer.c.

References Assert, binaryheap_free(), CloseTransientFile(), dlist_container, dlist_is_empty(), dlist_pop_head_node(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::fd, ReorderBufferIterTXNState::heap, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, pfree(), and ReorderBufferReturnChange().

Referenced by ReorderBufferCommit().

1015 {
1016  int32 off;
1017 
1018  for (off = 0; off < state->nr_txns; off++)
1019  {
1020  if (state->entries[off].fd != -1)
1021  CloseTransientFile(state->entries[off].fd);
1022  }
1023 
1024  /* free memory we might have "leaked" in the last *Next call */
1025  if (!dlist_is_empty(&state->old_change))
1026  {
1027  ReorderBufferChange *change;
1028 
1029  change = dlist_container(ReorderBufferChange, node,
1030  dlist_pop_head_node(&state->old_change));
1031  ReorderBufferReturnChange(rb, change);
1032  Assert(dlist_is_empty(&state->old_change));
1033  }
1034 
1035  binaryheap_free(state->heap);
1036  pfree(state);
1037 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
signed int int32
Definition: c.h:313
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:1031
int CloseTransientFile(int fd)
Definition: fd.c:2556
#define Assert(condition)
Definition: c.h:699
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:69
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368

◆ ReorderBufferIterTXNInit()

static ReorderBufferIterTXNState * ReorderBufferIterTXNInit ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 804 of file reorderbuffer.c.

References binaryheap_add_unordered(), binaryheap_allocate(), binaryheap_build(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, ReorderBuffer::context, dlist_iter::cur, dlist_container, dlist_foreach, dlist_head_element, dlist_init(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::fd, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, MemoryContextAllocZero(), ReorderBufferTXN::nentries, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, ReorderBufferIterCompare(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferIterTXNEntry::segno, ReorderBufferTXN::serialized, ReorderBufferTXN::subtxns, ReorderBufferTXNByIdEnt::txn, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferCommit().

805 {
806  Size nr_txns = 0;
808  dlist_iter cur_txn_i;
809  int32 off;
810 
811  /*
812  * Calculate the size of our heap: one element for every transaction that
813  * contains changes. (Besides the transactions already in the reorder
814  * buffer, we count the one we were directly passed.)
815  */
816  if (txn->nentries > 0)
817  nr_txns++;
818 
819  dlist_foreach(cur_txn_i, &txn->subtxns)
820  {
821  ReorderBufferTXN *cur_txn;
822 
823  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
824 
825  if (cur_txn->nentries > 0)
826  nr_txns++;
827  }
828 
829  /*
830  * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no
831  * need to allocate/build a heap then.
832  */
833 
834  /* allocate iteration state */
835  state = (ReorderBufferIterTXNState *)
837  sizeof(ReorderBufferIterTXNState) +
838  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
839 
840  state->nr_txns = nr_txns;
841  dlist_init(&state->old_change);
842 
843  for (off = 0; off < state->nr_txns; off++)
844  {
845  state->entries[off].fd = -1;
846  state->entries[off].segno = 0;
847  }
848 
849  /* allocate heap */
850  state->heap = binaryheap_allocate(state->nr_txns,
852  state);
853 
854  /*
855  * Now insert items into the binary heap, in an unordered fashion. (We
856  * will run a heap assembly step at the end; this is more efficient.)
857  */
858 
859  off = 0;
860 
861  /* add toplevel transaction if it contains changes */
862  if (txn->nentries > 0)
863  {
864  ReorderBufferChange *cur_change;
865 
866  if (txn->serialized)
867  {
868  /* serialize remaining changes */
869  ReorderBufferSerializeTXN(rb, txn);
870  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
871  &state->entries[off].segno);
872  }
873 
874  cur_change = dlist_head_element(ReorderBufferChange, node,
875  &txn->changes);
876 
877  state->entries[off].lsn = cur_change->lsn;
878  state->entries[off].change = cur_change;
879  state->entries[off].txn = txn;
880 
882  }
883 
884  /* add subtransactions if they contain changes */
885  dlist_foreach(cur_txn_i, &txn->subtxns)
886  {
887  ReorderBufferTXN *cur_txn;
888 
889  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
890 
891  if (cur_txn->nentries > 0)
892  {
893  ReorderBufferChange *cur_change;
894 
895  if (cur_txn->serialized)
896  {
897  /* serialize remaining changes */
898  ReorderBufferSerializeTXN(rb, cur_txn);
899  ReorderBufferRestoreChanges(rb, cur_txn,
900  &state->entries[off].fd,
901  &state->entries[off].segno);
902  }
903  cur_change = dlist_head_element(ReorderBufferChange, node,
904  &cur_txn->changes);
905 
906  state->entries[off].lsn = cur_change->lsn;
907  state->entries[off].change = cur_change;
908  state->entries[off].txn = cur_txn;
909 
911  }
912  }
913 
914  /* assemble a valid binary heap */
915  binaryheap_build(state->heap);
916 
917  return state;
918 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
ReorderBufferTXN * txn
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, int *fd, XLogSegNo *segno)
void binaryheap_add_unordered(binaryheap *heap, Datum d)
Definition: binaryheap.c:110
signed int int32
Definition: c.h:313
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
ReorderBufferChange * change
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
MemoryContext context
dlist_node * cur
Definition: ilist.h:161
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:814
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:126
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
Definition: regguts.h:298
size_t Size
Definition: c.h:433
dlist_head subtxns
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:33
#define Int32GetDatum(X)
Definition: postgres.h:464

◆ ReorderBufferIterTXNNext()

static ReorderBufferChange * ReorderBufferIterTXNNext ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 927 of file reorderbuffer.c.

References Assert, binaryheap::bh_size, binaryheap_first(), binaryheap_remove_first(), binaryheap_replace_first(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, DatumGetInt32, DEBUG2, dlist_container, dlist_delete(), dlist_has_next(), dlist_head_element, dlist_is_empty(), dlist_next_node(), dlist_pop_head_node(), dlist_push_tail(), elog, ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::fd, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, next, ReorderBufferChange::node, ReorderBufferIterTXNState::old_change, ReorderBufferRestoreChanges(), ReorderBufferReturnChange(), ReorderBufferIterTXNEntry::segno, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferCommit().

928 {
929  ReorderBufferChange *change;
931  int32 off;
932 
933  /* nothing there anymore */
934  if (state->heap->bh_size == 0)
935  return NULL;
936 
937  off = DatumGetInt32(binaryheap_first(state->heap));
938  entry = &state->entries[off];
939 
940  /* free memory we might have "leaked" in the previous *Next call */
941  if (!dlist_is_empty(&state->old_change))
942  {
943  change = dlist_container(ReorderBufferChange, node,
945  ReorderBufferReturnChange(rb, change);
946  Assert(dlist_is_empty(&state->old_change));
947  }
948 
949  change = entry->change;
950 
951  /*
952  * update heap with information about which transaction has the next
953  * relevant change in LSN order
954  */
955 
956  /* there are in-memory changes */
957  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
958  {
959  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
960  ReorderBufferChange *next_change =
962 
963  /* txn stays the same */
964  state->entries[off].lsn = next_change->lsn;
965  state->entries[off].change = next_change;
966 
968  return change;
969  }
970 
971  /* try to load changes from disk */
972  if (entry->txn->nentries != entry->txn->nentries_mem)
973  {
974  /*
975  * Ugly: restoring changes will reuse *Change records, thus delete the
976  * current one from the per-tx list and only free in the next call.
977  */
978  dlist_delete(&change->node);
979  dlist_push_tail(&state->old_change, &change->node);
980 
981  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
982  &state->entries[off].segno))
983  {
984  /* successfully restored changes from disk */
985  ReorderBufferChange *next_change =
987  &entry->txn->changes);
988 
989  elog(DEBUG2, "restored %u/%u changes from disk",
990  (uint32) entry->txn->nentries_mem,
991  (uint32) entry->txn->nentries);
992 
993  Assert(entry->txn->nentries_mem);
994  /* txn stays the same */
995  state->entries[off].lsn = next_change->lsn;
996  state->entries[off].change = next_change;
998 
999  return change;
1000  }
1001  }
1002 
1003  /* ok, no changes there anymore, remove */
1004  binaryheap_remove_first(state->heap);
1005 
1006  return change;
1007 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
static int32 next
Definition: blutils.c:211
#define DatumGetInt32(X)
Definition: postgres.h:457
ReorderBufferTXN * txn
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void binaryheap_replace_first(binaryheap *heap, Datum d)
Definition: binaryheap.c:204
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, int *fd, XLogSegNo *segno)
signed int int32
Definition: c.h:313
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:421
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
Datum binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:159
#define DEBUG2
Definition: elog.h:24
unsigned int uint32
Definition: c.h:325
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
int bh_size
Definition: binaryheap.h:32
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
ReorderBufferChange * change
static bool dlist_has_next(dlist_head *head, dlist_node *node)
Definition: ilist.h:402
#define Assert(condition)
Definition: c.h:699
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define Int32GetDatum(X)
Definition: postgres.h:464
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
Datum binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:174
#define elog
Definition: elog.h:219

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1848 of file reorderbuffer.c.

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by DecodeHeap2Op(), DecodeHeapOp(), DecodeLogicalMsgOp(), DecodeStandbyOp(), DecodeXactOp(), DecodeXLogOp(), and LogicalDecodingProcessRecord().

1849 {
1850  /* many records won't have an xid assigned, centralize check here */
1851  if (xid != InvalidTransactionId)
1852  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1853 }
#define InvalidTransactionId
Definition: transam.h:31
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change 
)

Definition at line 549 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBufferCheckSerializeTXN(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

551 {
552  ReorderBufferTXN *txn;
553 
554  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
555 
556  change->lsn = lsn;
557  Assert(InvalidXLogRecPtr != lsn);
558  dlist_push_tail(&txn->changes, &change->node);
559  txn->nentries++;
560  txn->nentries_mem++;
561 
563 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
dlist_head changes
#define Assert(condition)
Definition: c.h:699
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snapshot,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 569 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), TeardownHistoricSnapshot(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeLogicalMsgOp().

573 {
574  if (transactional)
575  {
576  MemoryContext oldcontext;
577  ReorderBufferChange *change;
578 
580 
581  oldcontext = MemoryContextSwitchTo(rb->context);
582 
583  change = ReorderBufferGetChange(rb);
585  change->data.msg.prefix = pstrdup(prefix);
586  change->data.msg.message_size = message_size;
587  change->data.msg.message = palloc(message_size);
588  memcpy(change->data.msg.message, message, message_size);
589 
590  ReorderBufferQueueChange(rb, xid, lsn, change);
591 
592  MemoryContextSwitchTo(oldcontext);
593  }
594  else
595  {
596  ReorderBufferTXN *txn = NULL;
597  volatile Snapshot snapshot_now = snapshot;
598 
599  if (xid != InvalidTransactionId)
600  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
601 
602  /* setup snapshot to allow catalog access */
603  SetupHistoricSnapshot(snapshot_now, NULL);
604  PG_TRY();
605  {
606  rb->message(rb, txn, lsn, false, prefix, message_size, message);
607 
609  }
610  PG_CATCH();
611  {
613  PG_RE_THROW();
614  }
615  PG_END_TRY();
616  }
617 }
char * pstrdup(const char *in)
Definition: mcxt.c:1161
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2017
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:78
#define InvalidTransactionId
Definition: transam.h:31
ReorderBufferMessageCB message
MemoryContext context
union ReorderBufferChange::@102 data
struct ReorderBufferChange::@102::@105 msg
#define PG_CATCH()
Definition: elog.h:293
#define Assert(condition)
Definition: c.h:699
#define PG_RE_THROW()
Definition: elog.h:314
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:924
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2001
#define PG_TRY()
Definition: elog.h:284
#define PG_END_TRY()
Definition: elog.h:300

◆ ReorderBufferRestoreChange()

static void ReorderBufferRestoreChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
char *  change 
)
static

Definition at line 2453 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, ReorderBufferTXN::changes, ReorderBuffer::context, ReorderBufferChange::data, dlist_push_tail(), MemoryContextAlloc(), MemoryContextAllocZero(), ReorderBufferChange::msg, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, offsetof, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferGetChange(), ReorderBufferGetTupleBuf(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, ReorderBufferChange::snapshot, SnapshotData::subxcnt, ReorderBufferChange::tp, SnapshotData::xcnt, and SnapshotData::xip.

Referenced by ReorderBufferRestoreChanges().

2455 {
2456  ReorderBufferDiskChange *ondisk;
2457  ReorderBufferChange *change;
2458 
2459  ondisk = (ReorderBufferDiskChange *) data;
2460 
2461  change = ReorderBufferGetChange(rb);
2462 
2463  /* copy static part */
2464  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
2465 
2466  data += sizeof(ReorderBufferDiskChange);
2467 
2468  /* restore individual stuff */
2469  switch (change->action)
2470  {
2471  /* fall through these, they're all similar enough */
2476  if (change->data.tp.oldtuple)
2477  {
2478  uint32 tuplelen = ((HeapTuple) data)->t_len;
2479 
2480  change->data.tp.oldtuple =
2482 
2483  /* restore ->tuple */
2484  memcpy(&change->data.tp.oldtuple->tuple, data,
2485  sizeof(HeapTupleData));
2486  data += sizeof(HeapTupleData);
2487 
2488  /* reset t_data pointer into the new tuplebuf */
2489  change->data.tp.oldtuple->tuple.t_data =
2490  ReorderBufferTupleBufData(change->data.tp.oldtuple);
2491 
2492  /* restore tuple data itself */
2493  memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
2494  data += tuplelen;
2495  }
2496 
2497  if (change->data.tp.newtuple)
2498  {
2499  /* here, data might not be suitably aligned! */
2500  uint32 tuplelen;
2501 
2502  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
2503  sizeof(uint32));
2504 
2505  change->data.tp.newtuple =
2507 
2508  /* restore ->tuple */
2509  memcpy(&change->data.tp.newtuple->tuple, data,
2510  sizeof(HeapTupleData));
2511  data += sizeof(HeapTupleData);
2512 
2513  /* reset t_data pointer into the new tuplebuf */
2514  change->data.tp.newtuple->tuple.t_data =
2515  ReorderBufferTupleBufData(change->data.tp.newtuple);
2516 
2517  /* restore tuple data itself */
2518  memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
2519  data += tuplelen;
2520  }
2521 
2522  break;
2524  {
2525  Size prefix_size;
2526 
2527  /* read prefix */
2528  memcpy(&prefix_size, data, sizeof(Size));
2529  data += sizeof(Size);
2530  change->data.msg.prefix = MemoryContextAlloc(rb->context,
2531  prefix_size);
2532  memcpy(change->data.msg.prefix, data, prefix_size);
2533  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
2534  data += prefix_size;
2535 
2536  /* read the message */
2537  memcpy(&change->data.msg.message_size, data, sizeof(Size));
2538  data += sizeof(Size);
2539  change->data.msg.message = MemoryContextAlloc(rb->context,
2540  change->data.msg.message_size);
2541  memcpy(change->data.msg.message, data,
2542  change->data.msg.message_size);
2543  data += change->data.msg.message_size;
2544 
2545  break;
2546  }
2548  {
2549  Snapshot oldsnap;
2550  Snapshot newsnap;
2551  Size size;
2552 
2553  oldsnap = (Snapshot) data;
2554 
2555  size = sizeof(SnapshotData) +
2556  sizeof(TransactionId) * oldsnap->xcnt +
2557  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
2558 
2559  change->data.snapshot = MemoryContextAllocZero(rb->context, size);
2560 
2561  newsnap = change->data.snapshot;
2562 
2563  memcpy(newsnap, data, size);
2564  newsnap->xip = (TransactionId *)
2565  (((char *) newsnap) + sizeof(SnapshotData));
2566  newsnap->subxip = newsnap->xip + newsnap->xcnt;
2567  newsnap->copied = true;
2568  break;
2569  }
2570  /* the base struct contains all the data, easy peasy */
2575  break;
2576  }
2577 
2578  dlist_push_tail(&txn->changes, &change->node);
2579  txn->nentries_mem++;
2580 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:183
HeapTupleData * HeapTuple
Definition: htup.h:71
uint32 TransactionId
Definition: c.h:474
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
struct SnapshotData * Snapshot
Definition: snapshot.h:23
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:78
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:36
struct ReorderBufferChange::@102::@103 tp
dlist_head changes
struct SnapshotData SnapshotData
unsigned int uint32
Definition: c.h:325
TransactionId * xip
Definition: snapshot.h:79
MemoryContext context
union ReorderBufferChange::@102 data
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:814
struct ReorderBufferChange::@102::@105 msg
struct ReorderBufferDiskChange ReorderBufferDiskChange
#define Assert(condition)
Definition: c.h:699
size_t Size
Definition: c.h:433
uint32 xcnt
Definition: snapshot.h:80
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:771
ReorderBufferChange change
struct HeapTupleData HeapTupleData
#define offsetof(type, field)
Definition: c.h:622
int32 subxcnt
Definition: snapshot.h:92
ReorderBufferTupleBuf * ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)

◆ ReorderBufferRestoreChanges()

static Size ReorderBufferRestoreChanges ( ReorderBuffer rb,
ReorderBufferTXN txn,
int *  fd,
XLogSegNo segno 
)
static

Definition at line 2322 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, cleanup(), CloseTransientFile(), dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, max_changes_in_memory, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, OpenTransientFile(), ReorderBuffer::outbuf, PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferSerializedPath(), ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, WAIT_EVENT_REORDER_BUFFER_READ, wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferIterTXNInit(), and ReorderBufferIterTXNNext().

2324 {
2325  Size restored = 0;
2326  XLogSegNo last_segno;
2327  dlist_mutable_iter cleanup_iter;
2328 
2331 
2332  /* free current entries, so we have memory for more */
2333  dlist_foreach_modify(cleanup_iter, &txn->changes)
2334  {
2336  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
2337 
2338  dlist_delete(&cleanup->node);
2339  ReorderBufferReturnChange(rb, cleanup);
2340  }
2341  txn->nentries_mem = 0;
2342  Assert(dlist_is_empty(&txn->changes));
2343 
2344  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
2345 
2346  while (restored < max_changes_in_memory && *segno <= last_segno)
2347  {
2348  int readBytes;
2349  ReorderBufferDiskChange *ondisk;
2350 
2351  if (*fd == -1)
2352  {
2353  char path[MAXPGPATH];
2354 
2355  /* first time in */
2356  if (*segno == 0)
2357  XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
2358 
2359  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2360 
2361  /*
2362  * No need to care about TLIs here, only used during a single run,
2363  * so each LSN only maps to a specific WAL record.
2364  */
2366  *segno);
2367 
2368  *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
2369  if (*fd < 0 && errno == ENOENT)
2370  {
2371  *fd = -1;
2372  (*segno)++;
2373  continue;
2374  }
2375  else if (*fd < 0)
2376  ereport(ERROR,
2378  errmsg("could not open file \"%s\": %m",
2379  path)));
2380  }
2381 
2382  /*
2383  * Read the statically sized part of a change which has information
2384  * about the total size. If we couldn't read a record, we're at the
2385  * end of this file.
2386  */
2389  readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
2391 
2392  /* eof */
2393  if (readBytes == 0)
2394  {
2396  *fd = -1;
2397  (*segno)++;
2398  continue;
2399  }
2400  else if (readBytes < 0)
2401  ereport(ERROR,
2403  errmsg("could not read from reorderbuffer spill file: %m")));
2404  else if (readBytes != sizeof(ReorderBufferDiskChange))
2405  ereport(ERROR,
2407  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2408  readBytes,
2409  (uint32) sizeof(ReorderBufferDiskChange))));
2410 
2411  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2412 
2414  sizeof(ReorderBufferDiskChange) + ondisk->size);
2415  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2416 
2418  readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
2419  ondisk->size - sizeof(ReorderBufferDiskChange));
2421 
2422  if (readBytes < 0)
2423  ereport(ERROR,
2425  errmsg("could not read from reorderbuffer spill file: %m")));
2426  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
2427  ereport(ERROR,
2429  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2430  readBytes,
2431  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
2432 
2433  /*
2434  * ok, read a full change from disk, now restore it into proper
2435  * in-memory format
2436  */
2437  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
2438  restored++;
2439  }
2440 
2441  return restored;
2442 }
XLogRecPtr first_lsn
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
dlist_node * cur
Definition: ilist.h:180
int wal_segment_size
Definition: xlog.c:113
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1080
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2386
dlist_head changes
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:34
int errcode_for_file_access(void)
Definition: elog.c:598
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
unsigned int uint32
Definition: c.h:325
XLogRecPtr final_lsn
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1260
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define ereport(elevel, rest)
Definition: elog.h:122
int CloseTransientFile(int fd)
Definition: fd.c:2556
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
static void cleanup(void)
Definition: bootstrap.c:875
TransactionId xid
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:699
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
size_t Size
Definition: c.h:433
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1236
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
int errmsg(const char *fmt,...)
Definition: elog.c:797
static const Size max_changes_in_memory
#define read(a, b, c)
Definition: win32.h:13
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ ReorderBufferRestoreCleanup()

static void ReorderBufferRestoreCleanup ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2586 of file reorderbuffer.c.

References Assert, cur, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, MAXPGPATH, MyReplicationSlot, ReorderBufferSerializedPath(), wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferCleanupTXN().

2587 {
2588  XLogSegNo first;
2589  XLogSegNo cur;
2590  XLogSegNo last;
2591 
2594 
2595  XLByteToSeg(txn->first_lsn, first, wal_segment_size);
2596  XLByteToSeg(txn->final_lsn, last, wal_segment_size);
2597 
2598  /* iterate over all possible filenames, and delete them */
2599  for (cur = first; cur <= last; cur++)
2600  {
2601  char path[MAXPGPATH];
2602 
2604  if (unlink(path) != 0 && errno != ENOENT)
2605  ereport(ERROR,
2607  errmsg("could not remove file \"%s\": %m", path)));
2608  }
2609 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int wal_segment_size
Definition: xlog.c:113
struct cursor * cur
Definition: ecpg.c:28
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:34
int errcode_for_file_access(void)
Definition: elog.c:598
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
XLogRecPtr final_lsn
#define ereport(elevel, rest)
Definition: elog.h:122
TransactionId xid
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:699
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ ReorderBufferReturnChange()

void ReorderBufferReturnChange ( ReorderBuffer rb,
ReorderBufferChange change 
)

Definition at line 378 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::msg, pfree(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferFreeSnap(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, and ReorderBufferChange::tp.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferCommit(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), and ReorderBufferToastReset().

379 {
380  /* free contained data */
381  switch (change->action)
382  {
387  if (change->data.tp.newtuple)
388  {
389  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
390  change->data.tp.newtuple = NULL;
391  }
392 
393  if (change->data.tp.oldtuple)
394  {
395  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
396  change->data.tp.oldtuple = NULL;
397  }
398  break;
400  if (change->data.msg.prefix != NULL)
401  pfree(change->data.msg.prefix);
402  change->data.msg.prefix = NULL;
403  if (change->data.msg.message != NULL)
404  pfree(change->data.msg.message);
405  change->data.msg.message = NULL;
406  break;
408  if (change->data.snapshot)
409  {
410  ReorderBufferFreeSnap(rb, change->data.snapshot);
411  change->data.snapshot = NULL;
412  }
413  break;
414  /* no data in addition to the struct itself */
419  break;
420  }
421 
422  pfree(change);
423 }
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:78
struct ReorderBufferChange::@102::@103 tp
void pfree(void *pointer)
Definition: mcxt.c:1031
union ReorderBufferChange::@102 data
struct ReorderBufferChange::@102::@105 msg
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)

◆ ReorderBufferReturnTupleBuf()

void ReorderBufferReturnTupleBuf ( ReorderBuffer rb,
ReorderBufferTupleBuf tuple 
)

Definition at line 454 of file reorderbuffer.c.

References pfree().

Referenced by ReorderBufferReturnChange().

455 {
456  pfree(tuple);
457 }
void pfree(void *pointer)
Definition: mcxt.c:1031

◆ ReorderBufferReturnTXN()

static void ReorderBufferReturnTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 330 of file reorderbuffer.c.

References ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, hash_destroy(), ReorderBufferTXN::invalidations, InvalidTransactionId, pfree(), ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::xid.

Referenced by ReorderBufferCleanupTXN().

331 {
332  /* clean the lookup cache if we were cached (quite likely) */
333  if (rb->by_txn_last_xid == txn->xid)
334  {
336  rb->by_txn_last_txn = NULL;
337  }
338 
339  /* free data that's contained */
340 
341  if (txn->tuplecid_hash != NULL)
342  {
344  txn->tuplecid_hash = NULL;
345  }
346 
347  if (txn->invalidations)
348  {
349  pfree(txn->invalidations);
350  txn->invalidations = NULL;
351  }
352 
353  pfree(txn);
354 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:814
TransactionId by_txn_last_xid
void pfree(void *pointer)
Definition: mcxt.c:1031
#define InvalidTransactionId
Definition: transam.h:31
ReorderBufferTXN * by_txn_last_txn
TransactionId xid
SharedInvalidationMessage * invalidations

◆ ReorderBufferSerializeChange()

static void ReorderBufferSerializeChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  fd,
ReorderBufferChange change 
)
static

Definition at line 2157 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, CloseTransientFile(), ReorderBufferChange::data, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferChange::msg, ReorderBuffer::outbuf, pgstat_report_wait_end(), pgstat_report_wait_start(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferTupleBuf::tuple, WAIT_EVENT_REORDER_BUFFER_WRITE, write, SnapshotData::xcnt, ReorderBufferTXN::xid, and SnapshotData::xip.

Referenced by ReorderBufferSerializeTXN().

2159 {
2160  ReorderBufferDiskChange *ondisk;
2161  Size sz = sizeof(ReorderBufferDiskChange);
2162 
2164 
2165  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2166  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
2167 
2168  switch (change->action)
2169  {
2170  /* fall through these, they're all similar enough */
2175  {
2176  char *data;
2177  ReorderBufferTupleBuf *oldtup,
2178  *newtup;
2179  Size oldlen = 0;
2180  Size newlen = 0;
2181 
2182  oldtup = change->data.tp.oldtuple;
2183  newtup = change->data.tp.newtuple;
2184 
2185  if (oldtup)
2186  {
2187  sz += sizeof(HeapTupleData);
2188  oldlen = oldtup->tuple.t_len;
2189  sz += oldlen;
2190  }
2191 
2192  if (newtup)
2193  {
2194  sz += sizeof(HeapTupleData);
2195  newlen = newtup->tuple.t_len;
2196  sz += newlen;
2197  }
2198 
2199  /* make sure we have enough space */
2201 
2202  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2203  /* might have been reallocated above */
2204  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2205 
2206  if (oldlen)
2207  {
2208  memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
2209  data += sizeof(HeapTupleData);
2210 
2211  memcpy(data, oldtup->tuple.t_data, oldlen);
2212  data += oldlen;
2213  }
2214 
2215  if (newlen)
2216  {
2217  memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
2218  data += sizeof(HeapTupleData);
2219 
2220  memcpy(data, newtup->tuple.t_data, newlen);
2221  data += newlen;
2222  }
2223  break;
2224  }
2226  {
2227  char *data;
2228  Size prefix_size = strlen(change->data.msg.prefix) + 1;
2229 
2230  sz += prefix_size + change->data.msg.message_size +
2231  sizeof(Size) + sizeof(Size);
2233 
2234  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2235 
2236  /* might have been reallocated above */
2237  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2238 
2239  /* write the prefix including the size */
2240  memcpy(data, &prefix_size, sizeof(Size));
2241  data += sizeof(Size);
2242  memcpy(data, change->data.msg.prefix,
2243  prefix_size);
2244  data += prefix_size;
2245 
2246  /* write the message including the size */
2247  memcpy(data, &change->data.msg.message_size, sizeof(Size));
2248  data += sizeof(Size);
2249  memcpy(data, change->data.msg.message,
2250  change->data.msg.message_size);
2251  data += change->data.msg.message_size;
2252 
2253  break;
2254  }
2256  {
2257  Snapshot snap;
2258  char *data;
2259 
2260  snap = change->data.snapshot;
2261 
2262  sz += sizeof(SnapshotData) +
2263  sizeof(TransactionId) * snap->xcnt +
2264  sizeof(TransactionId) * snap->subxcnt
2265  ;
2266 
2267  /* make sure we have enough space */
2269  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2270  /* might have been reallocated above */
2271  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2272 
2273  memcpy(data, snap, sizeof(SnapshotData));
2274  data += sizeof(SnapshotData);
2275 
2276  if (snap->xcnt)
2277  {
2278  memcpy(data, snap->xip,
2279  sizeof(TransactionId) * snap->xcnt);
2280  data += sizeof(TransactionId) * snap->xcnt;
2281  }
2282 
2283  if (snap->subxcnt)
2284  {
2285  memcpy(data, snap->subxip,
2286  sizeof(TransactionId) * snap->subxcnt);
2287  data += sizeof(TransactionId) * snap->subxcnt;
2288  }
2289  break;
2290  }
2295  /* ReorderBufferChange contains everything important */
2296  break;
2297  }
2298 
2299  ondisk->size = sz;
2300 
2302  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
2303  {
2304  int save_errno = errno;
2305 
2307  errno = save_errno;
2308  ereport(ERROR,
2310  errmsg("could not write to data file for XID %u: %m",
2311  txn->xid)));
2312  }
2314 
2315  Assert(ondisk->change.action == change->action);
2316 }
uint32 TransactionId
Definition: c.h:474
#define write(a, b, c)
Definition: win32.h:14
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:78
static int fd(const char *x, int i)
Definition: preproc-init.c:105
HeapTupleHeader t_data
Definition: htup.h:68
struct ReorderBufferChange::@102::@103 tp
#define ERROR
Definition: elog.h:43
uint32 t_len
Definition: htup.h:64
int errcode_for_file_access(void)
Definition: elog.c:598
HeapTupleData tuple
Definition: reorderbuffer.h:27
struct SnapshotData SnapshotData
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1260
#define ereport(elevel, rest)
Definition: elog.h:122
TransactionId * xip
Definition: snapshot.h:79
int CloseTransientFile(int fd)
Definition: fd.c:2556
union ReorderBufferChange::@102 data
TransactionId xid
struct ReorderBufferChange::@102::@105 msg
struct ReorderBufferDiskChange ReorderBufferDiskChange
#define Assert(condition)
Definition: c.h:699
size_t Size
Definition: c.h:433
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1236
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
uint32 xcnt
Definition: snapshot.h:80
int errmsg(const char *fmt,...)
Definition: elog.c:797
ReorderBufferChange change
struct HeapTupleData HeapTupleData
TransactionId * subxip
Definition: snapshot.h:91
int32 subxcnt
Definition: snapshot.h:92

◆ ReorderBufferSerializedPath()

static void ReorderBufferSerializedPath ( char *  path,
ReplicationSlot slot,
TransactionId  xid,
XLogSegNo  segno 
)
static

Definition at line 2655 of file reorderbuffer.c.

References ReplicationSlot::data, MAXPGPATH, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, snprintf(), wal_segment_size, and XLogSegNoOffsetToRecPtr.

Referenced by ReorderBufferRestoreChanges(), ReorderBufferRestoreCleanup(), and ReorderBufferSerializeTXN().

2657 {
2658  XLogRecPtr recptr;
2659 
2660  XLogSegNoOffsetToRecPtr(segno, 0, recptr, wal_segment_size);
2661 
2662  snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2664  xid,
2665  (uint32) (recptr >> 32), (uint32) recptr);
2666 }
int wal_segment_size
Definition: xlog.c:113
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
ReplicationSlotPersistentData data
Definition: slot.h:120
#define MAXPGPATH
#define XLogSegNoOffsetToRecPtr(segno, offset, dest, wal_segsz_bytes)
unsigned int uint32
Definition: c.h:325
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define NameStr(name)
Definition: c.h:576

◆ ReorderBufferSerializeReserve()

static void ReorderBufferSerializeReserve ( ReorderBuffer rb,
Size  sz 
)
static

Definition at line 2044 of file reorderbuffer.c.

References ReorderBuffer::context, MemoryContextAlloc(), ReorderBuffer::outbuf, ReorderBuffer::outbufsize, and repalloc().

Referenced by ReorderBufferRestoreChanges(), and ReorderBufferSerializeChange().

2045 {
2046  if (!rb->outbufsize)
2047  {
2048  rb->outbuf = MemoryContextAlloc(rb->context, sz);
2049  rb->outbufsize = sz;
2050  }
2051  else if (rb->outbufsize < sz)
2052  {
2053  rb->outbuf = repalloc(rb->outbuf, sz);
2054  rb->outbufsize = sz;
2055  }
2056 }
MemoryContext context
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1044
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:771

◆ ReorderBufferSerializeTXN()

static void ReorderBufferSerializeTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2079 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, CloseTransientFile(), dlist_iter::cur, dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_delete(), dlist_foreach, dlist_foreach_modify, dlist_is_empty(), elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferChange::lsn, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, OpenTransientFile(), PG_BINARY, ReorderBufferReturnChange(), ReorderBufferSerializeChange(), ReorderBufferSerializedPath(), ReorderBufferTXN::serialized, ReorderBufferTXN::subtxns, wal_segment_size, ReorderBufferTXN::xid, XLByteInSeg, and XLByteToSeg.

Referenced by ReorderBufferCheckSerializeTXN(), and ReorderBufferIterTXNInit().

2080 {
2081  dlist_iter subtxn_i;
2082  dlist_mutable_iter change_i;
2083  int fd = -1;
2084  XLogSegNo curOpenSegNo = 0;
2085  Size spilled = 0;
2086 
2087  elog(DEBUG2, "spill %u changes in XID %u to disk",
2088  (uint32) txn->nentries_mem, txn->xid);
2089 
2090  /* do the same to all child TXs */
2091  dlist_foreach(subtxn_i, &txn->subtxns)
2092  {
2093  ReorderBufferTXN *subtxn;
2094 
2095  subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
2096  ReorderBufferSerializeTXN(rb, subtxn);
2097  }
2098 
2099  /* serialize changestream */
2100  dlist_foreach_modify(change_i, &txn->changes)
2101  {
2102  ReorderBufferChange *change;
2103 
2104  change = dlist_container(ReorderBufferChange, node, change_i.cur);
2105 
2106  /*
2107  * store in segment in which it belongs by start lsn, don't split over
2108  * multiple segments tho
2109  */
2110  if (fd == -1 ||
2111  !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
2112  {
2113  char path[MAXPGPATH];
2114 
2115  if (fd != -1)
2116  CloseTransientFile(fd);
2117 
2118  XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
2119 
2120  /*
2121  * No need to care about TLIs here, only used during a single run,
2122  * so each LSN only maps to a specific WAL record.
2123  */
2125  curOpenSegNo);
2126 
2127  /* open segment, create it if necessary */
2128  fd = OpenTransientFile(path,
2129  O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
2130 
2131  if (fd < 0)
2132  ereport(ERROR,
2134  errmsg("could not open file \"%s\": %m", path)));
2135  }
2136 
2137  ReorderBufferSerializeChange(rb, txn, fd, change);
2138  dlist_delete(&change->node);
2139  ReorderBufferReturnChange(rb, change);
2140 
2141  spilled++;
2142  }
2143 
2144  Assert(spilled == txn->nentries_mem);
2145  Assert(dlist_is_empty(&txn->changes));
2146  txn->nentries_mem = 0;
2147  txn->serialized = true;
2148 
2149  if (fd != -1)
2150  CloseTransientFile(fd);
2151 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
dlist_node * cur
Definition: ilist.h:180
int wal_segment_size
Definition: xlog.c:113
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1080
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2386
dlist_head changes
#define MAXPGPATH
#define DEBUG2
Definition: elog.h:24
uint64 XLogSegNo
Definition: xlogdefs.h:34
int errcode_for_file_access(void)
Definition: elog.c:598
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
unsigned int uint32
Definition: c.h:325
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define ereport(elevel, rest)
Definition: elog.h:122
int CloseTransientFile(int fd)
Definition: fd.c:2556
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
dlist_node * cur
Definition: ilist.h:161
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:699
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
size_t Size
Definition: c.h:433
dlist_head subtxns
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define elog
Definition: elog.h:219
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ ReorderBufferSetBaseSnapshot()

void ReorderBufferSetBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 1881 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildCommitTxn(), and SnapBuildProcessChange().

1883 {
1884  ReorderBufferTXN *txn;
1885  bool is_new;
1886 
1887  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
1888  Assert(txn->base_snapshot == NULL);
1889  Assert(snap != NULL);
1890 
1891  txn->base_snapshot = snap;
1892  txn->base_snapshot_lsn = lsn;
1893 }
Snapshot base_snapshot
XLogRecPtr base_snapshot_lsn
#define Assert(condition)
Definition: c.h:699
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferSetRestartPoint()

void ReorderBufferSetRestartPoint ( ReorderBuffer rb,
XLogRecPtr  ptr 
)

Definition at line 664 of file reorderbuffer.c.

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

665 {
667 }
XLogRecPtr current_restart_decoding_lsn

◆ ReorderBufferToastAppendChunk()

static void ReorderBufferToastAppendChunk ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 2728 of file reorderbuffer.c.

References Assert, ReorderBufferToastEnt::chunk_id, ReorderBufferToastEnt::chunks, ReorderBufferChange::data, DatumGetInt32, DatumGetObjectId, DatumGetPointer, dlist_init(), dlist_push_tail(), elog, ERROR, fastgetattr, HASH_ENTER, hash_search(), IsToastRelation(), ReorderBufferToastEnt::last_chunk_seq, ReorderBufferChange::node, ReorderBufferToastEnt::num_chunks, ReorderBufferToastEnt::reconstructed, RelationGetDescr, ReorderBufferToastInitHash(), ReorderBufferToastEnt::size, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, ReorderBufferTupleBuf::tuple, VARATT_IS_EXTENDED, VARATT_IS_SHORT, VARHDRSZ, VARHDRSZ_SHORT, VARSIZE, and VARSIZE_SHORT.

Referenced by ReorderBufferCommit().

2730 {
2731  ReorderBufferToastEnt *ent;
2732  ReorderBufferTupleBuf *newtup;
2733  bool found;
2734  int32 chunksize;
2735  bool isnull;
2736  Pointer chunk;
2737  TupleDesc desc = RelationGetDescr(relation);
2738  Oid chunk_id;
2739  int32 chunk_seq;
2740 
2741  if (txn->toast_hash == NULL)
2742  ReorderBufferToastInitHash(rb, txn);
2743 
2744  Assert(IsToastRelation(relation));
2745 
2746  newtup = change->data.tp.newtuple;
2747  chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
2748  Assert(!isnull);
2749  chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
2750  Assert(!isnull);
2751 
2752  ent = (ReorderBufferToastEnt *)
2753  hash_search(txn->toast_hash,
2754  (void *) &chunk_id,
2755  HASH_ENTER,
2756  &found);
2757 
2758  if (!found)
2759  {
2760  Assert(ent->chunk_id == chunk_id);
2761  ent->num_chunks = 0;
2762  ent->last_chunk_seq = 0;
2763  ent->size = 0;
2764  ent->reconstructed = NULL;
2765  dlist_init(&ent->chunks);
2766 
2767  if (chunk_seq != 0)
2768  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
2769  chunk_seq, chunk_id);
2770  }
2771  else if (found && chunk_seq != ent->last_chunk_seq + 1)
2772  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
2773  chunk_seq, chunk_id, ent->last_chunk_seq + 1);
2774 
2775  chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
2776  Assert(!isnull);
2777 
2778  /* calculate size so we can allocate the right size at once later */
2779  if (!VARATT_IS_EXTENDED(chunk))
2780  chunksize = VARSIZE(chunk) - VARHDRSZ;
2781  else if (VARATT_IS_SHORT(chunk))
2782  /* could happen due to heap_form_tuple doing its thing */
2783  chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
2784  else
2785  elog(ERROR, "unexpected type of toast chunk");
2786 
2787  ent->size += chunksize;
2788  ent->last_chunk_seq = chunk_seq;
2789  ent->num_chunks++;
2790  dlist_push_tail(&ent->chunks, &change->node);
2791 }
bool IsToastRelation(Relation relation)
Definition: catalog.c:136
#define fastgetattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:731
#define DatumGetInt32(X)
Definition: postgres.h:457
#define RelationGetDescr(relation)
Definition: rel.h:433
#define VARHDRSZ_SHORT
Definition: postgres.h:268
#define VARSIZE(PTR)
Definition: postgres.h:303
#define VARHDRSZ
Definition: c.h:522
#define DatumGetObjectId(X)
Definition: postgres.h:485
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
unsigned int Oid
Definition: postgres_ext.h:31
signed int int32
Definition: c.h:313
struct ReorderBufferChange::@102::@103 tp
char * Pointer
Definition: c.h:302
#define ERROR
Definition: elog.h:43
#define VARATT_IS_SHORT(PTR)
Definition: postgres.h:326
struct varlena * reconstructed
HeapTupleData tuple
Definition: reorderbuffer.h:27
#define VARSIZE_SHORT(PTR)
Definition: postgres.h:305
union ReorderBufferChange::@102 data
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
#define Assert(condition)
Definition: c.h:699
#define VARATT_IS_EXTENDED(PTR)
Definition: postgres.h:327
#define DatumGetPointer(X)
Definition: postgres.h:534
#define elog
Definition: elog.h:219
static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)

◆ ReorderBufferToastInitHash()

static void ReorderBufferToastInitHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2707 of file reorderbuffer.c.

References Assert, ReorderBuffer::context, HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, and ReorderBufferTXN::toast_hash.

Referenced by ReorderBufferToastAppendChunk().

2708 {
2709  HASHCTL hash_ctl;
2710 
2711  Assert(txn->toast_hash == NULL);
2712 
2713  memset(&hash_ctl, 0, sizeof(hash_ctl));
2714  hash_ctl.keysize = sizeof(Oid);
2715  hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
2716  hash_ctl.hcxt = rb->context;
2717  txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
2719 }
struct ReorderBufferToastEnt ReorderBufferToastEnt
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
Size entrysize
Definition: hsearch.h:73
unsigned int Oid
Definition: postgres_ext.h:31
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext context
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
Size keysize
Definition: hsearch.h:72
#define Assert(condition)
Definition: c.h:699

◆ ReorderBufferToastReplace()

static void ReorderBufferToastReplace ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 2801 of file reorderbuffer.c.

References Assert, ReorderBufferToastEnt::chunks, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, DatumGetPointer, dlist_container, dlist_foreach, fastgetattr, free, HASH_FIND, hash_search(), heap_deform_tuple(), heap_form_tuple(), INDIRECT_POINTER_SIZE, MaxHeapTupleSize, MemoryContextSwitchTo(), tupleDesc::natts, palloc0(), pfree(), varatt_indirect::pointer, PointerGetDatum, RelationData::rd_rel, ReorderBufferToastEnt::reconstructed, RelationClose(), RelationGetDescr, RelationIdGetRelation(), ReorderBufferTupleBufData, SET_VARSIZE, SET_VARSIZE_COMPRESSED, SET_VARTAG_EXTERNAL, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, ReorderBufferTupleBuf::tuple, TupleDescAttr, varatt_external::va_extsize, varatt_external::va_rawsize, varatt_external::va_valueid, VARATT_EXTERNAL_GET_POINTER, VARATT_EXTERNAL_IS_COMPRESSED, VARATT_IS_EXTERNAL, VARATT_IS_SHORT, VARDATA, VARDATA_EXTERNAL, VARHDRSZ, VARSIZE, and VARTAG_INDIRECT.

Referenced by ReorderBufferCommit().

2803 {
2804  TupleDesc desc;
2805  int natt;
2806  Datum *attrs;
2807  bool *isnull;
2808  bool *free;
2809  HeapTuple tmphtup;
2810  Relation toast_rel;
2811  TupleDesc toast_desc;
2812  MemoryContext oldcontext;
2813  ReorderBufferTupleBuf *newtup;
2814 
2815  /* no toast tuples changed */
2816  if (txn->toast_hash == NULL)
2817  return;
2818 
2819  oldcontext = MemoryContextSwitchTo(rb->context);
2820 
2821  /* we should only have toast tuples in an INSERT or UPDATE */
2822  Assert(change->data.tp.newtuple);
2823 
2824  desc = RelationGetDescr(relation);
2825 
2826  toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
2827  toast_desc = RelationGetDescr(toast_rel);
2828 
2829  /* should we allocate from stack instead? */
2830  attrs = palloc0(sizeof(Datum) * desc->natts);
2831  isnull = palloc0(sizeof(bool) * desc->natts);
2832  free = palloc0(sizeof(bool) * desc->natts);
2833 
2834  newtup = change->data.tp.newtuple;
2835 
2836  heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
2837 
2838  for (natt = 0; natt < desc->natts; natt++)
2839  {
2840  Form_pg_attribute attr = TupleDescAttr(desc, natt);
2841  ReorderBufferToastEnt *ent;
2842  struct varlena *varlena;
2843 
2844  /* va_rawsize is the size of the original datum -- including header */
2845  struct varatt_external toast_pointer;
2846  struct varatt_indirect redirect_pointer;
2847  struct varlena *new_datum = NULL;
2848  struct varlena *reconstructed;
2849  dlist_iter it;
2850  Size data_done = 0;
2851 
2852  /* system columns aren't toasted */
2853  if (attr->attnum < 0)
2854  continue;
2855 
2856  if (attr->attisdropped)
2857  continue;
2858 
2859  /* not a varlena datatype */
2860  if (attr->attlen != -1)
2861  continue;
2862 
2863  /* no data */
2864  if (isnull[natt])
2865  continue;
2866 
2867  /* ok, we know we have a toast datum */
2868  varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
2869 
2870  /* no need to do anything if the tuple isn't external */
2871  if (!VARATT_IS_EXTERNAL(varlena))
2872  continue;
2873 
2874  VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
2875 
2876  /*
2877  * Check whether the toast tuple changed, replace if so.
2878  */
2879  ent = (ReorderBufferToastEnt *)
2880  hash_search(txn->toast_hash,
2881  (void *) &toast_pointer.va_valueid,
2882  HASH_FIND,
2883  NULL);
2884  if (ent == NULL)
2885  continue;
2886 
2887  new_datum =
2888  (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
2889 
2890  free[natt] = true;
2891 
2892  reconstructed = palloc0(toast_pointer.va_rawsize);
2893 
2894  ent->reconstructed = reconstructed;
2895 
2896  /* stitch toast tuple back together from its parts */
2897  dlist_foreach(it, &ent->chunks)
2898  {
2899  bool isnull;
2900  ReorderBufferChange *cchange;
2901  ReorderBufferTupleBuf *ctup;
2902  Pointer chunk;
2903 
2904  cchange = dlist_container(ReorderBufferChange, node, it.cur);
2905  ctup = cchange->data.tp.newtuple;
2906  chunk = DatumGetPointer(
2907  fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
2908 
2909  Assert(!isnull);
2910  Assert(!VARATT_IS_EXTERNAL(chunk));
2911  Assert(!VARATT_IS_SHORT(chunk));
2912 
2913  memcpy(VARDATA(reconstructed) + data_done,
2914  VARDATA(chunk),
2915  VARSIZE(chunk) - VARHDRSZ);
2916  data_done += VARSIZE(chunk) - VARHDRSZ;
2917  }
2918  Assert(data_done == toast_pointer.va_extsize);
2919 
2920  /* make sure its marked as compressed or not */
2921  if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
2922  SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
2923  else
2924  SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
2925 
2926  memset(&redirect_pointer, 0, sizeof(redirect_pointer));
2927  redirect_pointer.pointer = reconstructed;
2928 
2930  memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
2931  sizeof(redirect_pointer));
2932 
2933  attrs[natt] = PointerGetDatum(new_datum);
2934  }
2935 
2936  /*
2937  * Build tuple in separate memory & copy tuple back into the tuplebuf
2938  * passed to the output plugin. We can't directly heap_fill_tuple() into
2939  * the tuplebuf because attrs[] will point back into the current content.
2940  */
2941  tmphtup = heap_form_tuple(desc, attrs, isnull);
2942  Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
2943  Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
2944 
2945  memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
2946  newtup->tuple.t_len = tmphtup->t_len;
2947 
2948  /*
2949  * free resources we won't further need, more persistent stuff will be
2950  * free'd in ReorderBufferToastReset().
2951  */
2952  RelationClose(toast_rel);
2953  pfree(tmphtup);
2954  for (natt = 0; natt < desc->natts; natt++)
2955  {
2956  if (free[natt])
2957  pfree(DatumGetPointer(attrs[natt]));
2958  }
2959  pfree(attrs);
2960  pfree(free);
2961  pfree(isnull);
2962 
2963  MemoryContextSwitchTo(oldcontext);
2964 }
#define VARDATA(PTR)
Definition: postgres.h:302
#define fastgetattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:731
#define RelationGetDescr(relation)
Definition: rel.h:433
#define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)
Definition: tuptoaster.h:111
#define VARSIZE(PTR)
Definition: postgres.h:303
#define PointerGetDatum(X)
Definition: postgres.h:541
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:93
#define VARHDRSZ
Definition: c.h:522
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1074
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
Form_pg_class rd_rel
Definition: rel.h:84
#define VARDATA_EXTERNAL(PTR)
Definition: postgres.h:310
int natts
Definition: tupdesc.h:82
HeapTupleHeader t_data
Definition: htup.h:68
#define VARATT_IS_EXTERNAL(PTR)
Definition: postgres.h:313
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:36
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
struct ReorderBufferChange::@102::@103 tp
void pfree(void *pointer)
Definition: mcxt.c:1031
char * Pointer
Definition: c.h:302
#define VARATT_IS_SHORT(PTR)
Definition: postgres.h:326
uint32 t_len
Definition: htup.h:64
#define MaxHeapTupleSize
Definition: htup_details.h:573
struct varlena * reconstructed
#define SET_VARTAG_EXTERNAL(PTR, tag)
Definition: postgres.h:333
HeapTupleData tuple
Definition: reorderbuffer.h:27
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:197
void RelationClose(Relation relation)
Definition: relcache.c:1996
#define INDIRECT_POINTER_SIZE
Definition: tuptoaster.h:102
MemoryContext context
void * palloc0(Size size)
Definition: mcxt.c:955
uintptr_t Datum
Definition: postgres.h:367
union ReorderBufferChange::@102 data
dlist_node * cur
Definition: ilist.h:161
#define free(a)
Definition: header.h:65
#define Assert(condition)
Definition: c.h:699
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
Definition: tuptoaster.h:121
size_t Size
Definition: c.h:433
#define DatumGetPointer(X)
Definition: postgres.h:534
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:1315
#define SET_VARSIZE_COMPRESSED(PTR, len)
Definition: postgres.h:331
Definition: c.h:516
#define SET_VARSIZE(PTR, len)
Definition: postgres.h:329
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:1906

◆ ReorderBufferToastReset()

static void ReorderBufferToastReset ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2970 of file reorderbuffer.c.

References ReorderBufferToastEnt::chunks, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, hash_destroy(), hash_seq_init(), hash_seq_search(), ReorderBufferChange::node, pfree(), ReorderBufferToastEnt::reconstructed, ReorderBufferReturnChange(), and ReorderBufferTXN::toast_hash.

Referenced by ReorderBufferCommit().

2971 {
2972  HASH_SEQ_STATUS hstat;
2973  ReorderBufferToastEnt *ent;
2974 
2975  if (txn->toast_hash == NULL)
2976  return;
2977 
2978  /* sequentially walk over the hash and free everything */
2979  hash_seq_init(&hstat, txn->toast_hash);
2980  while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
2981  {
2982  dlist_mutable_iter it;
2983 
2984  if (ent->reconstructed != NULL)
2985  pfree(ent->reconstructed);
2986 
2987  dlist_foreach_modify(it, &ent->chunks)
2988  {
2989  ReorderBufferChange *change =
2991 
2992  dlist_delete(&change->node);
2993  ReorderBufferReturnChange(rb, change);
2994  }
2995  }
2996 
2997  hash_destroy(txn->toast_hash);
2998  txn->toast_hash = NULL;
2999 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:814
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:1031
struct varlena * reconstructed
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1389
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1379

◆ ReorderBufferTXNByXid()

static ReorderBufferTXN * ReorderBufferTXNByXid ( ReorderBuffer rb,
TransactionId  xid,
bool  create,
bool is_new,
XLogRecPtr  lsn,
bool  create_as_top 
)
static

Definition at line 466 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::current_restart_decoding_lsn, dlist_push_tail(), ReorderBufferTXN::first_lsn, HASH_ENTER, HASH_FIND, hash_search(), InvalidXLogRecPtr, ReorderBufferTXN::node, ReorderBufferGetTXN(), ReorderBufferTXN::restart_decoding_lsn, ReorderBuffer::toplevel_by_lsn, TransactionIdIsValid, ReorderBufferTXNByIdEnt::txn, ReorderBufferTXNByIdEnt::xid, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAddInvalidations(), ReorderBufferAddNewTupleCids(), ReorderBufferAssignChild(), ReorderBufferCommit(), ReorderBufferCommitChild(), ReorderBufferForget(), ReorderBufferProcessXid(), ReorderBufferQueueChange(), ReorderBufferQueueMessage(), ReorderBufferSetBaseSnapshot(), ReorderBufferXidHasBaseSnapshot(), ReorderBufferXidHasCatalogChanges(), and ReorderBufferXidSetCatalogChanges().

468 {
469  ReorderBufferTXN *txn;
471  bool found;
472 
474  Assert(!create || lsn != InvalidXLogRecPtr);
475 
476  /*
477  * Check the one-entry lookup cache first
478  */
480  rb->by_txn_last_xid == xid)
481  {
482  txn = rb->by_txn_last_txn;
483 
484  if (txn != NULL)
485  {
486  /* found it, and it's valid */
487  if (is_new)
488  *is_new = false;
489  return txn;
490  }
491 
492  /*
493  * cached as non-existent, and asked not to create? Then nothing else
494  * to do.
495  */
496  if (!create)
497  return NULL;
498  /* otherwise fall through to create it */
499  }
500 
501  /*
502  * If the cache wasn't hit or it yielded an "does-not-exist" and we want
503  * to create an entry.
504  */
505 
506  /* search the lookup table */
507  ent = (ReorderBufferTXNByIdEnt *)
508  hash_search(rb->by_txn,
509  (void *) &xid,
510  create ? HASH_ENTER : HASH_FIND,
511  &found);
512  if (found)
513  txn = ent->txn;
514  else if (create)
515  {
516  /* initialize the new entry, if creation was requested */
517  Assert(ent != NULL);
518 
519  ent->txn = ReorderBufferGetTXN(rb);
520  ent->txn->xid = xid;
521  txn = ent->txn;
522  txn->first_lsn = lsn;
524 
525  if (create_as_top)
526  {
527  dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
528  AssertTXNLsnOrder(rb);
529  }
530  }
531  else
532  txn = NULL; /* not found and not asked to create */
533 
534  /* update cache */
535  rb->by_txn_last_xid = xid;
536  rb->by_txn_last_txn = txn;
537 
538  if (is_new)
539  *is_new = !found;
540 
541  Assert(!create || txn != NULL);
542  return txn;
543 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
static ReorderBufferTXN * ReorderBufferGetTXN(ReorderBuffer *rb)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
ReorderBufferTXN * by_txn_last_txn
dlist_head toplevel_by_lsn
TransactionId xid
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:699
ReorderBufferTXN * txn
Definition: reorderbuffer.c:88
XLogRecPtr restart_decoding_lsn
#define TransactionIdIsValid(xid)
Definition: transam.h:41

◆ ReorderBufferXidHasBaseSnapshot()

bool ReorderBufferXidHasBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 2014 of file reorderbuffer.c.

References ReorderBufferTXN::base_snapshot, InvalidXLogRecPtr, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildCommitTxn(), SnapBuildDistributeNewCatalogSnapshot(), and SnapBuildProcessChange().

2015 {
2016  ReorderBufferTXN *txn;
2017 
2018  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2019  false);
2020 
2021  /* transaction isn't known yet, ergo no snapshot */
2022  if (txn == NULL)
2023  return false;
2024 
2025  /*
2026  * TODO: It would be a nice improvement if we would check the toplevel
2027  * transaction in subtransactions, but we'd need to keep track of a bit
2028  * more state.
2029  */
2030  return txn->base_snapshot != NULL;
2031 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferXidHasCatalogChanges()

bool ReorderBufferXidHasCatalogChanges ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 1998 of file reorderbuffer.c.

References ReorderBufferTXN::has_catalog_changes, InvalidXLogRecPtr, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildCommitTxn().

1999 {
2000  ReorderBufferTXN *txn;
2001 
2002  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2003  false);
2004  if (txn == NULL)
2005  return false;
2006 
2007  return txn->has_catalog_changes;
2008 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferXidSetCatalogChanges()

void ReorderBufferXidSetCatalogChanges ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1983 of file reorderbuffer.c.

References ReorderBufferTXN::has_catalog_changes, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit(), DecodeHeapOp(), and SnapBuildProcessNewCid().

1985 {
1986  ReorderBufferTXN *txn;
1987 
1988  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1989 
1990  txn->has_catalog_changes = true;
1991 }
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ResolveCminCmaxDuringDecoding()

bool ResolveCminCmaxDuringDecoding ( HTAB tuplecid_data,
Snapshot  snapshot,
HeapTuple  htup,
Buffer  buffer,
CommandId cmin,
CommandId cmax 
)

Definition at line 3278 of file reorderbuffer.c.

References Assert, BufferGetTag(), BufferIsLocal, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, HASH_FIND, hash_search(), ItemPointerCopy, ItemPointerGetBlockNumber, MAIN_FORKNUM, ReorderBufferTupleCidKey::relnode, HeapTupleData::t_self, HeapTupleData::t_tableOid, ReorderBufferTupleCidKey::tid, and UpdateLogicalMappings().

Referenced by HeapTupleSatisfiesHistoricMVCC().

3282 {
3285  ForkNumber forkno;
3286  BlockNumber blockno;
3287  bool updated_mapping = false;
3288 
3289  /* be careful about padding */
3290  memset(&key, 0, sizeof(key));
3291 
3293 
3294  /*
3295  * get relfilenode from the buffer, no convenient way to access it other
3296  * than that.
3297  */
3298  BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
3299 
3300  /* tuples can only be in the main fork */
3301  Assert(forkno == MAIN_FORKNUM);
3302  Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
3303 
3304  ItemPointerCopy(&htup->t_self,
3305  &key.tid);
3306 
3307 restart:
3308  ent = (ReorderBufferTupleCidEnt *)
3309  hash_search(tuplecid_data,
3310  (void *) &key,
3311  HASH_FIND,
3312  NULL);
3313 
3314  /*
3315  * failed to find a mapping, check whether the table was rewritten and
3316  * apply mapping if so, but only do that once - there can be no new
3317  * mappings while we are in here since we have to hold a lock on the
3318  * relation.
3319  */
3320  if (ent == NULL && !updated_mapping)
3321  {
3322  UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
3323  /* now check but don't update for a mapping again */
3324  updated_mapping = true;
3325  goto restart;
3326  }
3327  else if (ent == NULL)
3328  return false;
3329 
3330  if (cmin)
3331  *cmin = ent->cmin;
3332  if (cmax)
3333  *cmax = ent->cmax;
3334  return true;
3335 }
uint32 BlockNumber
Definition: block.h:31
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
ItemPointerData t_self
Definition: htup.h:65
Oid t_tableOid
Definition: htup.h:66
ForkNumber
Definition: relpath.h:40
#define Assert(condition)
Definition: c.h:699
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:215
#define BufferIsLocal(buffer)
Definition: buf.h:37
static void UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
#define ItemPointerGetBlockNumber(pointer)
Definition: itemptr.h:98
void BufferGetTag(Buffer buffer, RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
Definition: bufmgr.c:2626
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:161

◆ StartupReorderBuffer()

void StartupReorderBuffer ( void  )

Definition at line 2673 of file reorderbuffer.c.

References AllocateDir(), dirent::d_name, DEBUG2, FreeDir(), ReadDir(), ReorderBufferCleanupSerializedTXNs(), and ReplicationSlotValidateName().

Referenced by StartupXLOG().

2674 {
2675  DIR *logical_dir;
2676  struct dirent *logical_de;
2677 
2678  logical_dir = AllocateDir("pg_replslot");
2679  while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
2680  {
2681  if (strcmp(logical_de->d_name, ".") == 0 ||
2682  strcmp(logical_de->d_name, "..") == 0)
2683  continue;
2684 
2685  /* if it cannot be a slot, skip the directory */
2686  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
2687  continue;
2688 
2689  /*
2690  * ok, has to be a surviving logical slot, iterate and delete
2691  * everything starting with xid-*
2692  */
2694  }
2695  FreeDir(logical_dir);
2696 }
Definition: dirent.h:9
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:174
Definition: dirent.c:25
#define DEBUG2
Definition: elog.h:24
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2590
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2656
char d_name[MAX_PATH]
Definition: dirent.h:14
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
int FreeDir(DIR *dir)
Definition: fd.c:2708

◆ TransactionIdInArray()

static bool TransactionIdInArray ( TransactionId  xid,
TransactionId xip,
Size  num 
)
static

Definition at line 3162 of file reorderbuffer.c.

References xidComparator().

Referenced by UpdateLogicalMappings().

3163 {
3164  return bsearch(&xid, xip, num,
3165  sizeof(TransactionId), xidComparator) != NULL;
3166 }
uint32 TransactionId
Definition: c.h:474
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:138

◆ UpdateLogicalMappings()