PostgreSQL Source Code  git master
reorderbuffer.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/detoast.h"
#include "access/heapam.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/combocid.h"
#include "utils/memdebug.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenodemap.h"
Include dependency graph for reorderbuffer.c:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTXNByIdEnt
 
struct  ReorderBufferTupleCidKey
 
struct  ReorderBufferTupleCidEnt
 
struct  ReorderBufferIterTXNEntry
 
struct  ReorderBufferIterTXNState
 
struct  ReorderBufferToastEnt
 
struct  ReorderBufferDiskChange
 
struct  RewriteMappingFile
 

Typedefs

typedef struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
 
typedef struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
 
typedef struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
 
typedef struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
 
typedef struct ReorderBufferIterTXNState ReorderBufferIterTXNState
 
typedef struct ReorderBufferToastEnt ReorderBufferToastEnt
 
typedef struct ReorderBufferDiskChange ReorderBufferDiskChange
 
typedef struct RewriteMappingFile RewriteMappingFile
 

Functions

static ReorderBufferTXNReorderBufferGetTXN (ReorderBuffer *rb)
 
static void ReorderBufferReturnTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferTXNReorderBufferTXNByXid (ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
 
static void ReorderBufferTransferSnapToParent (ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
 
static void AssertTXNLsnOrder (ReorderBuffer *rb)
 
static ReorderBufferIterTXNStateReorderBufferIterTXNInit (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferChangeReorderBufferIterTXNNext (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferIterTXNFinish (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferExecuteInvalidations (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferCheckSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeChange (ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
 
static Size ReorderBufferRestoreChanges (ReorderBuffer *rb, ReorderBufferTXN *txn, int *fd, XLogSegNo *segno)
 
static void ReorderBufferRestoreChange (ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
 
static void ReorderBufferRestoreCleanup (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferCleanupSerializedTXNs (const char *slotname)
 
static void ReorderBufferSerializedPath (char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
 
static void ReorderBufferFreeSnap (ReorderBuffer *rb, Snapshot snap)
 
static Snapshot ReorderBufferCopySnap (ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
 
static void ReorderBufferToastInitHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReset (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReplace (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void ReorderBufferToastAppendChunk (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *rb)
 
void ReorderBufferReturnChange (ReorderBuffer *rb, ReorderBufferChange *change)
 
ReorderBufferTupleBufReorderBufferGetTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
 
OidReorderBufferGetRelids (ReorderBuffer *rb, int nrelids)
 
void ReorderBufferReturnRelids (ReorderBuffer *rb, Oid *relids)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
TransactionId ReorderBufferGetOldestXmin (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
static int ReorderBufferIterCompare (Datum a, Datum b, void *arg)
 
static void ReorderBufferCleanupTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferBuildTupleCidHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
static void ReorderBufferSerializeReserve (ReorderBuffer *rb, Size sz)
 
void StartupReorderBuffer (void)
 
static void ApplyLogicalMappingFile (HTAB *tuplecid_data, Oid relid, const char *fname)
 
static bool TransactionIdInArray (TransactionId xid, TransactionId *xip, Size num)
 
static int file_sort_by_lsn (const ListCell *a_p, const ListCell *b_p)
 
static void UpdateLogicalMappings (HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
 
bool ResolveCminCmaxDuringDecoding (HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
 

Variables

static const Size max_changes_in_memory = 4096
 

Typedef Documentation

◆ ReorderBufferDiskChange

◆ ReorderBufferIterTXNEntry

◆ ReorderBufferIterTXNState

◆ ReorderBufferToastEnt

◆ ReorderBufferTupleCidEnt

◆ ReorderBufferTupleCidKey

◆ ReorderBufferTXNByIdEnt

◆ RewriteMappingFile

Function Documentation

◆ ApplyLogicalMappingFile()

static void ApplyLogicalMappingFile ( HTAB tuplecid_data,
Oid  relid,
const char *  fname 
)
static

Definition at line 3280 of file reorderbuffer.c.

References Assert, CloseTransientFile(), ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), HASH_ENTER, HASH_FIND, hash_search(), InvalidCommandId, ItemPointerCopy, sort-test::key, MAXPGPATH, LogicalRewriteMappingData::new_node, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_node, LogicalRewriteMappingData::old_tid, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, ReorderBufferTupleCidKey::relnode, sprintf, ReorderBufferTupleCidKey::tid, and WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ.

Referenced by UpdateLogicalMappings().

3281 {
3282  char path[MAXPGPATH];
3283  int fd;
3284  int readBytes;
3286 
3287  sprintf(path, "pg_logical/mappings/%s", fname);
3288  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
3289  if (fd < 0)
3290  ereport(ERROR,
3292  errmsg("could not open file \"%s\": %m", path)));
3293 
3294  while (true)
3295  {
3298  ReorderBufferTupleCidEnt *new_ent;
3299  bool found;
3300 
3301  /* be careful about padding */
3302  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
3303 
3304  /* read all mappings till the end of the file */
3306  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
3308 
3309  if (readBytes < 0)
3310  ereport(ERROR,
3312  errmsg("could not read file \"%s\": %m",
3313  path)));
3314  else if (readBytes == 0) /* EOF */
3315  break;
3316  else if (readBytes != sizeof(LogicalRewriteMappingData))
3317  ereport(ERROR,
3319  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
3320  path, readBytes,
3321  (int32) sizeof(LogicalRewriteMappingData))));
3322 
3323  key.relnode = map.old_node;
3324  ItemPointerCopy(&map.old_tid,
3325  &key.tid);
3326 
3327 
3328  ent = (ReorderBufferTupleCidEnt *)
3329  hash_search(tuplecid_data,
3330  (void *) &key,
3331  HASH_FIND,
3332  NULL);
3333 
3334  /* no existing mapping, no need to update */
3335  if (!ent)
3336  continue;
3337 
3338  key.relnode = map.new_node;
3339  ItemPointerCopy(&map.new_tid,
3340  &key.tid);
3341 
3342  new_ent = (ReorderBufferTupleCidEnt *)
3343  hash_search(tuplecid_data,
3344  (void *) &key,
3345  HASH_ENTER,
3346  &found);
3347 
3348  if (found)
3349  {
3350  /*
3351  * Make sure the existing mapping makes sense. We sometime update
3352  * old records that did not yet have a cmax (e.g. pg_class' own
3353  * entry while rewriting it) during rewrites, so allow that.
3354  */
3355  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
3356  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
3357  }
3358  else
3359  {
3360  /* update mapping */
3361  new_ent->cmin = ent->cmin;
3362  new_ent->cmax = ent->cmax;
3363  new_ent->combocid = ent->combocid;
3364  }
3365  }
3366 
3367  if (CloseTransientFile(fd) != 0)
3368  ereport(ERROR,
3370  errmsg("could not close file \"%s\": %m", path)));
3371 }
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1191
signed int int32
Definition: c.h:346
#define sprintf
Definition: port.h:194
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2255
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:593
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1342
#define ereport(elevel, rest)
Definition: elog.h:141
int CloseTransientFile(int fd)
Definition: fd.c:2432
#define InvalidCommandId
Definition: c.h:524
ItemPointerData new_tid
Definition: rewriteheap.h:40
#define Assert(condition)
Definition: c.h:732
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1318
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define read(a, b, c)
Definition: win32.h:13
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:161
ItemPointerData old_tid
Definition: rewriteheap.h:39

◆ AssertTXNLsnOrder()

static void AssertTXNLsnOrder ( ReorderBuffer rb)
static

Definition at line 659 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferAssignChild(), ReorderBufferGetOldestTXN(), ReorderBufferGetOldestXmin(), ReorderBufferSetBaseSnapshot(), and ReorderBufferTXNByXid().

660 {
661 #ifdef USE_ASSERT_CHECKING
662  dlist_iter iter;
663  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
664  XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
665 
666  dlist_foreach(iter, &rb->toplevel_by_lsn)
667  {
669  iter.cur);
670 
671  /* start LSN must be set */
672  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
673 
674  /* If there is an end LSN, it must be higher than start LSN */
675  if (cur_txn->end_lsn != InvalidXLogRecPtr)
676  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
677 
678  /* Current initial LSN must be strictly higher than previous */
679  if (prev_first_lsn != InvalidXLogRecPtr)
680  Assert(prev_first_lsn < cur_txn->first_lsn);
681 
682  /* known-as-subtxn txns must not be listed */
683  Assert(!cur_txn->is_known_as_subxact);
684 
685  prev_first_lsn = cur_txn->first_lsn;
686  }
687 
689  {
691  base_snapshot_node,
692  iter.cur);
693 
694  /* base snapshot (and its LSN) must be set */
695  Assert(cur_txn->base_snapshot != NULL);
697 
698  /* current LSN must be strictly higher than previous */
699  if (prev_base_snap_lsn != InvalidXLogRecPtr)
700  Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
701 
702  /* known-as-subtxn txns must not be listed */
703  Assert(!cur_txn->is_known_as_subxact);
704 
705  prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
706  }
707 #endif
708 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
XLogRecPtr base_snapshot_lsn
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head txns_by_base_snapshot_lsn
dlist_head toplevel_by_lsn
dlist_node * cur
Definition: ilist.h:161
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:732
XLogRecPtr end_lsn

◆ file_sort_by_lsn()

static int file_sort_by_lsn ( const ListCell a_p,
const ListCell b_p 
)
static

Definition at line 3388 of file reorderbuffer.c.

References lfirst, and RewriteMappingFile::lsn.

Referenced by UpdateLogicalMappings().

3389 {
3392 
3393  if (a->lsn < b->lsn)
3394  return -1;
3395  else if (a->lsn > b->lsn)
3396  return 1;
3397  return 0;
3398 }
#define lfirst(lc)
Definition: pg_list.h:190

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1856 of file reorderbuffer.c.

References ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferCleanupTXN(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeAbort().

1857 {
1858  ReorderBufferTXN *txn;
1859 
1860  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1861  false);
1862 
1863  /* unknown, nothing to remove */
1864  if (txn == NULL)
1865  return;
1866 
1867  /* cosmetic... */
1868  txn->final_lsn = lsn;
1869 
1870  /* remove potential on-disk data, and deallocate */
1871  ReorderBufferCleanupTXN(rb, txn);
1872 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)

Definition at line 1882 of file reorderbuffer.c.

References ReorderBufferTXN::changes, dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, dlist_tail_element, elog, ReorderBufferTXN::final_lsn, ReorderBufferChange::lsn, ReorderBufferCleanupTXN(), ReorderBufferTXN::serialized, ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::xid.

Referenced by DecodeStandbyOp().

1883 {
1884  dlist_mutable_iter it;
1885 
1886  /*
1887  * Iterate through all (potential) toplevel TXNs and abort all that are
1888  * older than what possibly can be running. Once we've found the first
1889  * that is alive we stop, there might be some that acquired an xid earlier
1890  * but started writing later, but it's unlikely and they will be cleaned
1891  * up in a later call to this function.
1892  */
1894  {
1895  ReorderBufferTXN *txn;
1896 
1897  txn = dlist_container(ReorderBufferTXN, node, it.cur);
1898 
1899  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1900  {
1901  /*
1902  * We set final_lsn on a transaction when we decode its commit or
1903  * abort record, but we never see those records for crashed
1904  * transactions. To ensure cleanup of these transactions, set
1905  * final_lsn to that of their last change; this causes
1906  * ReorderBufferRestoreCleanup to do the right thing.
1907  */
1908  if (txn->serialized && txn->final_lsn == 0)
1909  {
1910  ReorderBufferChange *last =
1912 
1913  txn->final_lsn = last->lsn;
1914  }
1915 
1916  elog(DEBUG2, "aborting old transaction %u", txn->xid);
1917 
1918  /* remove potential on-disk data, and deallocate this tx */
1919  ReorderBufferCleanupTXN(rb, txn);
1920  }
1921  else
1922  return;
1923  }
1924 }
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:496
dlist_head changes
#define DEBUG2
Definition: elog.h:24
XLogRecPtr final_lsn
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
dlist_head toplevel_by_lsn
TransactionId xid
#define elog(elevel,...)
Definition: elog.h:226

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 2118 of file reorderbuffer.c.

References Assert, ReorderBuffer::context, elog, ERROR, ReorderBufferTXN::invalidations, MemoryContextAlloc(), ReorderBufferTXN::ninvalidations, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

2121 {
2122  ReorderBufferTXN *txn;
2123 
2124  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2125 
2126  if (txn->ninvalidations != 0)
2127  elog(ERROR, "only ever add one set of invalidations");
2128 
2129  Assert(nmsgs > 0);
2130 
2131  txn->ninvalidations = nmsgs;
2134  sizeof(SharedInvalidationMessage) * nmsgs);
2135  memcpy(txn->invalidations, msgs,
2136  sizeof(SharedInvalidationMessage) * nmsgs);
2137 }
#define ERROR
Definition: elog.h:43
MemoryContext context
#define Assert(condition)
Definition: c.h:732
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
#define elog(elevel,...)
Definition: elog.h:226

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

Definition at line 2074 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::command_id, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, ReorderBufferGetChange(), and ReorderBufferQueueChange().

Referenced by SnapBuildProcessNewCid().

2076 {
2078 
2079  change->data.command_id = cid;
2081 
2082  ReorderBufferQueueChange(rb, xid, lsn, change);
2083 }
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:78
union ReorderBufferChange::@101 data
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileNode  node,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 2090 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessNewCid().

2094 {
2096  ReorderBufferTXN *txn;
2097 
2098  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2099 
2100  change->data.tuplecid.node = node;
2101  change->data.tuplecid.tid = tid;
2102  change->data.tuplecid.cmin = cmin;
2103  change->data.tuplecid.cmax = cmax;
2104  change->data.tuplecid.combocid = combocid;
2105  change->lsn = lsn;
2107 
2108  dlist_push_tail(&txn->tuplecids, &change->node);
2109  txn->ntuplecids++;
2110 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
struct ReorderBufferChange::@101::@105 tuplecid
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:78
union ReorderBufferChange::@101 data
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
dlist_head tuplecids

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 2025 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, ReorderBufferGetChange(), ReorderBufferQueueChange(), and ReorderBufferChange::snapshot.

Referenced by SnapBuildDistributeNewCatalogSnapshot().

2027 {
2029 
2030  change->data.snapshot = snap;
2032 
2033  ReorderBufferQueueChange(rb, xid, lsn, change);
2034 }
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:78
union ReorderBufferChange::@101 data
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)

◆ ReorderBufferAllocate()

ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 226 of file reorderbuffer.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, ReorderBufferCleanupSerializedTXNs(), SLAB_DEFAULT_BLOCK_SIZE, SLAB_LARGE_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::toplevel_by_lsn, ReorderBuffer::tup_context, ReorderBuffer::txn_context, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

227 {
228  ReorderBuffer *buffer;
229  HASHCTL hash_ctl;
230  MemoryContext new_ctx;
231 
232  Assert(MyReplicationSlot != NULL);
233 
234  /* allocate memory in own context, to have better accountability */
236  "ReorderBuffer",
238 
239  buffer =
240  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
241 
242  memset(&hash_ctl, 0, sizeof(hash_ctl));
243 
244  buffer->context = new_ctx;
245 
246  buffer->change_context = SlabContextCreate(new_ctx,
247  "Change",
249  sizeof(ReorderBufferChange));
250 
251  buffer->txn_context = SlabContextCreate(new_ctx,
252  "TXN",
254  sizeof(ReorderBufferTXN));
255 
256  buffer->tup_context = GenerationContextCreate(new_ctx,
257  "Tuples",
259 
260  hash_ctl.keysize = sizeof(TransactionId);
261  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
262  hash_ctl.hcxt = buffer->context;
263 
264  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
266 
268  buffer->by_txn_last_txn = NULL;
269 
270  buffer->outbuf = NULL;
271  buffer->outbufsize = 0;
272 
274 
275  dlist_init(&buffer->toplevel_by_lsn);
277 
278  /*
279  * Ensure there's no stale data from prior uses of this slot, in case some
280  * prior exit avoided calling ReorderBufferFree. Failure to do this can
281  * produce duplicated txns, and it's very cheap if there's nothing there.
282  */
284 
285  return buffer;
286 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define AllocSetContextCreate
Definition: memutils.h:170
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
uint32 TransactionId
Definition: c.h:507
MemoryContext hcxt
Definition: hsearch.h:78
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
Size entrysize
Definition: hsearch.h:73
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:188
ReplicationSlotPersistentData data
Definition: slot.h:132
MemoryContext change_context
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:222
dlist_head txns_by_base_snapshot_lsn
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
#define InvalidTransactionId
Definition: transam.h:31
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size blockSize)
Definition: generation.c:212
MemoryContext context
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
ReorderBufferTXN * by_txn_last_txn
dlist_head toplevel_by_lsn
Size keysize
Definition: hsearch.h:72
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:732
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:221
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
#define NameStr(name)
Definition: c.h:609
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
MemoryContext tup_context
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext txn_context

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 770 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), elog, ERROR, ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, ReorderBufferTXN::toplevel_xid, ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXNByIdEnt::xid.

Referenced by DecodeXactOp(), and ReorderBufferCommitChild().

772 {
773  ReorderBufferTXN *txn;
774  ReorderBufferTXN *subtxn;
775  bool new_top;
776  bool new_sub;
777 
778  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
779  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
780 
781  if (new_top && !new_sub)
782  elog(ERROR, "subtransaction logged without previous top-level txn record");
783 
784  if (!new_sub)
785  {
786  if (subtxn->is_known_as_subxact)
787  {
788  /* already associated, nothing to do */
789  return;
790  }
791  else
792  {
793  /*
794  * We already saw this transaction, but initially added it to the
795  * list of top-level txns. Now that we know it's not top-level,
796  * remove it from there.
797  */
798  dlist_delete(&subtxn->node);
799  }
800  }
801 
802  subtxn->is_known_as_subxact = true;
803  subtxn->toplevel_xid = xid;
804  Assert(subtxn->nsubtxns == 0);
805 
806  /* add to subtransaction list */
807  dlist_push_tail(&txn->subtxns, &subtxn->node);
808  txn->nsubtxns++;
809 
810  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
812 
813  /* Verify LSN-ordering invariant */
814  AssertTXNLsnOrder(rb);
815 }
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
#define ERROR
Definition: elog.h:43
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:732
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define elog(elevel,...)
Definition: elog.h:226
TransactionId toplevel_xid

◆ ReorderBufferBuildTupleCidHash()

static void ReorderBufferBuildTupleCidHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1275 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, dlist_container, dlist_foreach, dlist_is_empty(), HASHCTL::entrysize, ReorderBufferTXN::has_catalog_changes, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FIND, hash_search(), HASHCTL::hcxt, InvalidCommandId, ItemPointerCopy, sort-test::key, HASHCTL::keysize, ReorderBufferTXN::ntuplecids, ReorderBufferTupleCidKey::relnode, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferTupleCidKey::tid, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::tuplecids.

Referenced by ReorderBufferCommit().

1276 {
1277  dlist_iter iter;
1278  HASHCTL hash_ctl;
1279 
1280  if (!txn->has_catalog_changes || dlist_is_empty(&txn->tuplecids))
1281  return;
1282 
1283  memset(&hash_ctl, 0, sizeof(hash_ctl));
1284 
1285  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1286  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1287  hash_ctl.hcxt = rb->context;
1288 
1289  /*
1290  * create the hash with the exact number of to-be-stored tuplecids from
1291  * the start
1292  */
1293  txn->tuplecid_hash =
1294  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1296 
1297  dlist_foreach(iter, &txn->tuplecids)
1298  {
1301  bool found;
1302  ReorderBufferChange *change;
1303 
1304  change = dlist_container(ReorderBufferChange, node, iter.cur);
1305 
1307 
1308  /* be careful about padding */
1309  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1310 
1311  key.relnode = change->data.tuplecid.node;
1312 
1313  ItemPointerCopy(&change->data.tuplecid.tid,
1314  &key.tid);
1315 
1316  ent = (ReorderBufferTupleCidEnt *)
1318  (void *) &key,
1320  &found);
1321  if (!found)
1322  {
1323  ent->cmin = change->data.tuplecid.cmin;
1324  ent->cmax = change->data.tuplecid.cmax;
1325  ent->combocid = change->data.tuplecid.combocid;
1326  }
1327  else
1328  {
1329  /*
1330  * Maybe we already saw this tuple before in this transaction, but
1331  * if so it must have the same cmin.
1332  */
1333  Assert(ent->cmin == change->data.tuplecid.cmin);
1334 
1335  /*
1336  * cmax may be initially invalid, but once set it can only grow,
1337  * and never become invalid again.
1338  */
1339  Assert((ent->cmax == InvalidCommandId) ||
1340  ((change->data.tuplecid.cmax != InvalidCommandId) &&
1341  (change->data.tuplecid.cmax > ent->cmax)));
1342  ent->cmax = change->data.tuplecid.cmax;
1343  }
1344  }
1345 }
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
Size entrysize
Definition: hsearch.h:73
struct ReorderBufferChange::@101::@105 tuplecid
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:78
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext context
#define InvalidCommandId
Definition: c.h:524
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
Size keysize
Definition: hsearch.h:72
dlist_node * cur
Definition: ilist.h:161
#define Assert(condition)
Definition: c.h:732
union ReorderBufferChange::@101 data
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
dlist_head tuplecids
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:161

◆ ReorderBufferCheckSerializeTXN()

static void ReorderBufferCheckSerializeTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2236 of file reorderbuffer.c.

References Assert, max_changes_in_memory, ReorderBufferTXN::nentries_mem, and ReorderBufferSerializeTXN().

Referenced by ReorderBufferQueueChange().

2237 {
2238  /*
2239  * TODO: improve accounting so we cheaply can take subtransactions into
2240  * account here.
2241  */
2242  if (txn->nentries_mem >= max_changes_in_memory)
2243  {
2244  ReorderBufferSerializeTXN(rb, txn);
2245  Assert(txn->nentries_mem == 0);
2246  }
2247 }
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:732
static const Size max_changes_in_memory

◆ ReorderBufferCleanupSerializedTXNs()

static void ReorderBufferCleanupSerializedTXNs ( const char *  slotname)
static

Definition at line 2823 of file reorderbuffer.c.

References AllocateDir(), dirent::d_name, ereport, errcode_for_file_access(), errmsg(), ERROR, FreeDir(), INFO, lstat, MAXPGPATH, ReadDirExtended(), S_ISDIR, snprintf, sprintf, and stat.

Referenced by ReorderBufferAllocate(), ReorderBufferFree(), and StartupReorderBuffer().

2824 {
2825  DIR *spill_dir;
2826  struct dirent *spill_de;
2827  struct stat statbuf;
2828  char path[MAXPGPATH * 2 + 12];
2829 
2830  sprintf(path, "pg_replslot/%s", slotname);
2831 
2832  /* we're only handling directories here, skip if it's not ours */
2833  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2834  return;
2835 
2836  spill_dir = AllocateDir(path);
2837  while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
2838  {
2839  /* only look at names that can be ours */
2840  if (strncmp(spill_de->d_name, "xid", 3) == 0)
2841  {
2842  snprintf(path, sizeof(path),
2843  "pg_replslot/%s/%s", slotname,
2844  spill_de->d_name);
2845 
2846  if (unlink(path) != 0)
2847  ereport(ERROR,
2849  errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
2850  path, slotname)));
2851  }
2852  }
2853  FreeDir(spill_dir);
2854 }
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2547
#define INFO
Definition: elog.h:33
Definition: dirent.h:9
#define sprintf
Definition: port.h:194
Definition: dirent.c:25
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:593
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2466
#define ereport(elevel, rest)
Definition: elog.h:141
#define stat(a, b)
Definition: win32_port.h:255
#define S_ISDIR(m)
Definition: win32_port.h:296
#define lstat(path, sb)
Definition: win32_port.h:244
int errmsg(const char *fmt,...)
Definition: elog.c:784
char d_name[MAX_PATH]
Definition: dirent.h:14
#define snprintf
Definition: port.h:192
int FreeDir(DIR *dir)
Definition: fd.c:2584

◆ ReorderBufferCleanupTXN()

static void ReorderBufferCleanupTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1190 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_node, ReorderBuffer::by_txn, ReorderBufferTXN::changes, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, HASH_REMOVE, hash_search(), ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferRestoreCleanup(), ReorderBufferReturnChange(), ReorderBufferReturnTXN(), ReorderBufferTXN::serialized, SnapBuildSnapDecRefcount(), ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAbortOld(), ReorderBufferCommit(), and ReorderBufferForget().

1191 {
1192  bool found;
1193  dlist_mutable_iter iter;
1194 
1195  /* cleanup subtransactions & their changes */
1196  dlist_foreach_modify(iter, &txn->subtxns)
1197  {
1198  ReorderBufferTXN *subtxn;
1199 
1200  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1201 
1202  /*
1203  * Subtransactions are always associated to the toplevel TXN, even if
1204  * they originally were happening inside another subtxn, so we won't
1205  * ever recurse more than one level deep here.
1206  */
1207  Assert(subtxn->is_known_as_subxact);
1208  Assert(subtxn->nsubtxns == 0);
1209 
1210  ReorderBufferCleanupTXN(rb, subtxn);
1211  }
1212 
1213  /* cleanup changes in the toplevel txn */
1214  dlist_foreach_modify(iter, &txn->changes)
1215  {
1216  ReorderBufferChange *change;
1217 
1218  change = dlist_container(ReorderBufferChange, node, iter.cur);
1219 
1220  ReorderBufferReturnChange(rb, change);
1221  }
1222 
1223  /*
1224  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1225  * They are always stored in the toplevel transaction.
1226  */
1227  dlist_foreach_modify(iter, &txn->tuplecids)
1228  {
1229  ReorderBufferChange *change;
1230 
1231  change = dlist_container(ReorderBufferChange, node, iter.cur);
1233  ReorderBufferReturnChange(rb, change);
1234  }
1235 
1236  /*
1237  * Cleanup the base snapshot, if set.
1238  */
1239  if (txn->base_snapshot != NULL)
1240  {
1243  }
1244 
1245  /*
1246  * Remove TXN from its containing list.
1247  *
1248  * Note: if txn->is_known_as_subxact, we are deleting the TXN from its
1249  * parent's list of known subxacts; this leaves the parent's nsubxacts
1250  * count too high, but we don't care. Otherwise, we are deleting the TXN
1251  * from the LSN-ordered list of toplevel TXNs.
1252  */
1253  dlist_delete(&txn->node);
1254 
1255  /* now remove reference from buffer */
1256  hash_search(rb->by_txn,
1257  (void *) &txn->xid,
1258  HASH_REMOVE,
1259  &found);
1260  Assert(found);
1261 
1262  /* remove entries spilled to disk */
1263  if (txn->serialized)
1264  ReorderBufferRestoreCleanup(rb, txn);
1265 
1266  /* deallocate */
1267  ReorderBufferReturnTXN(rb, txn);
1268 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
Snapshot base_snapshot
dlist_node base_snapshot_node
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:78
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
#define Assert(condition)
Definition: c.h:732
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:433
dlist_head subtxns
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
dlist_head tuplecids

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 1434 of file reorderbuffer.c.

References AbortCurrentTransaction(), ReorderBufferChange::action, ReorderBuffer::apply_change, ReorderBuffer::apply_truncate, Assert, ReorderBufferTXN::base_snapshot, ReorderBuffer::begin, BeginInternalSubTransaction(), ReorderBufferChange::command_id, ReorderBuffer::commit, ReorderBufferTXN::commit_time, SnapshotData::copied, SnapshotData::curcid, ReorderBufferChange::data, dlist_delete(), elog, ReorderBufferTXN::end_lsn, ERROR, ReorderBufferTXN::final_lsn, FirstCommandId, GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), i, InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, ReorderBuffer::message, ReorderBufferChange::msg, ReorderBufferTXN::ninvalidations, ReorderBufferChange::node, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBuffer::output_rewrites, palloc0(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelationIsValid, RelidByRelfilenode(), relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferReturnChange(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTXNByXid(), RollbackAndReleaseCurrentSubTransaction(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, StartTransactionCommand(), TeardownHistoricSnapshot(), ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

1438 {
1439  ReorderBufferTXN *txn;
1440  volatile Snapshot snapshot_now;
1441  volatile CommandId command_id = FirstCommandId;
1442  bool using_subtxn;
1443  ReorderBufferIterTXNState *volatile iterstate = NULL;
1444 
1445  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1446  false);
1447 
1448  /* unknown transaction, nothing to replay */
1449  if (txn == NULL)
1450  return;
1451 
1452  txn->final_lsn = commit_lsn;
1453  txn->end_lsn = end_lsn;
1454  txn->commit_time = commit_time;
1455  txn->origin_id = origin_id;
1456  txn->origin_lsn = origin_lsn;
1457 
1458  /*
1459  * If this transaction has no snapshot, it didn't make any changes to the
1460  * database, so there's nothing to decode. Note that
1461  * ReorderBufferCommitChild will have transferred any snapshots from
1462  * subtransactions if there were any.
1463  */
1464  if (txn->base_snapshot == NULL)
1465  {
1466  Assert(txn->ninvalidations == 0);
1467  ReorderBufferCleanupTXN(rb, txn);
1468  return;
1469  }
1470 
1471  snapshot_now = txn->base_snapshot;
1472 
1473  /* build data to be able to lookup the CommandIds of catalog tuples */
1475 
1476  /* setup the initial snapshot */
1477  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1478 
1479  /*
1480  * Decoding needs access to syscaches et al., which in turn use
1481  * heavyweight locks and such. Thus we need to have enough state around to
1482  * keep track of those. The easiest way is to simply use a transaction
1483  * internally. That also allows us to easily enforce that nothing writes
1484  * to the database by checking for xid assignments.
1485  *
1486  * When we're called via the SQL SRF there's already a transaction
1487  * started, so start an explicit subtransaction there.
1488  */
1489  using_subtxn = IsTransactionOrTransactionBlock();
1490 
1491  PG_TRY();
1492  {
1493  ReorderBufferChange *change;
1494  ReorderBufferChange *specinsert = NULL;
1495 
1496  if (using_subtxn)
1497  BeginInternalSubTransaction("replay");
1498  else
1500 
1501  rb->begin(rb, txn);
1502 
1503  iterstate = ReorderBufferIterTXNInit(rb, txn);
1504  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1505  {
1506  Relation relation = NULL;
1507  Oid reloid;
1508 
1509  switch (change->action)
1510  {
1512 
1513  /*
1514  * Confirmation for speculative insertion arrived. Simply
1515  * use as a normal record. It'll be cleaned up at the end
1516  * of INSERT processing.
1517  */
1518  if (specinsert == NULL)
1519  elog(ERROR, "invalid ordering of speculative insertion changes");
1520  Assert(specinsert->data.tp.oldtuple == NULL);
1521  change = specinsert;
1523 
1524  /* intentionally fall through */
1528  Assert(snapshot_now);
1529 
1530  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1531  change->data.tp.relnode.relNode);
1532 
1533  /*
1534  * Mapped catalog tuple without data, emitted while
1535  * catalog table was in the process of being rewritten. We
1536  * can fail to look up the relfilenode, because the
1537  * relmapper has no "historic" view, in contrast to normal
1538  * the normal catalog during decoding. Thus repeated
1539  * rewrites can cause a lookup failure. That's OK because
1540  * we do not decode catalog changes anyway. Normally such
1541  * tuples would be skipped over below, but we can't
1542  * identify whether the table should be logically logged
1543  * without mapping the relfilenode to the oid.
1544  */
1545  if (reloid == InvalidOid &&
1546  change->data.tp.newtuple == NULL &&
1547  change->data.tp.oldtuple == NULL)
1548  goto change_done;
1549  else if (reloid == InvalidOid)
1550  elog(ERROR, "could not map filenode \"%s\" to relation OID",
1551  relpathperm(change->data.tp.relnode,
1552  MAIN_FORKNUM));
1553 
1554  relation = RelationIdGetRelation(reloid);
1555 
1556  if (!RelationIsValid(relation))
1557  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1558  reloid,
1559  relpathperm(change->data.tp.relnode,
1560  MAIN_FORKNUM));
1561 
1562  if (!RelationIsLogicallyLogged(relation))
1563  goto change_done;
1564 
1565  /*
1566  * Ignore temporary heaps created during DDL unless the
1567  * plugin has asked for them.
1568  */
1569  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
1570  goto change_done;
1571 
1572  /*
1573  * For now ignore sequence changes entirely. Most of the
1574  * time they don't log changes using records we
1575  * understand, so it doesn't make sense to handle the few
1576  * cases we do.
1577  */
1578  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1579  goto change_done;
1580 
1581  /* user-triggered change */
1582  if (!IsToastRelation(relation))
1583  {
1584  ReorderBufferToastReplace(rb, txn, relation, change);
1585  rb->apply_change(rb, txn, relation, change);
1586 
1587  /*
1588  * Only clear reassembled toast chunks if we're sure
1589  * they're not required anymore. The creator of the
1590  * tuple tells us.
1591  */
1592  if (change->data.tp.clear_toast_afterwards)
1593  ReorderBufferToastReset(rb, txn);
1594  }
1595  /* we're not interested in toast deletions */
1596  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1597  {
1598  /*
1599  * Need to reassemble the full toasted Datum in
1600  * memory, to ensure the chunks don't get reused till
1601  * we're done remove it from the list of this
1602  * transaction's changes. Otherwise it will get
1603  * freed/reused while restoring spooled data from
1604  * disk.
1605  */
1606  Assert(change->data.tp.newtuple != NULL);
1607 
1608  dlist_delete(&change->node);
1609  ReorderBufferToastAppendChunk(rb, txn, relation,
1610  change);
1611  }
1612 
1613  change_done:
1614 
1615  /*
1616  * Either speculative insertion was confirmed, or it was
1617  * unsuccessful and the record isn't needed anymore.
1618  */
1619  if (specinsert != NULL)
1620  {
1621  ReorderBufferReturnChange(rb, specinsert);
1622  specinsert = NULL;
1623  }
1624 
1625  if (relation != NULL)
1626  {
1627  RelationClose(relation);
1628  relation = NULL;
1629  }
1630  break;
1631 
1633 
1634  /*
1635  * Speculative insertions are dealt with by delaying the
1636  * processing of the insert until the confirmation record
1637  * arrives. For that we simply unlink the record from the
1638  * chain, so it does not get freed/reused while restoring
1639  * spooled data from disk.
1640  *
1641  * This is safe in the face of concurrent catalog changes
1642  * because the relevant relation can't be changed between
1643  * speculative insertion and confirmation due to
1644  * CheckTableNotInUse() and locking.
1645  */
1646 
1647  /* clear out a pending (and thus failed) speculation */
1648  if (specinsert != NULL)
1649  {
1650  ReorderBufferReturnChange(rb, specinsert);
1651  specinsert = NULL;
1652  }
1653 
1654  /* and memorize the pending insertion */
1655  dlist_delete(&change->node);
1656  specinsert = change;
1657  break;
1658 
1660  {
1661  int i;
1662  int nrelids = change->data.truncate.nrelids;
1663  int nrelations = 0;
1664  Relation *relations;
1665 
1666  relations = palloc0(nrelids * sizeof(Relation));
1667  for (i = 0; i < nrelids; i++)
1668  {
1669  Oid relid = change->data.truncate.relids[i];
1670  Relation relation;
1671 
1672  relation = RelationIdGetRelation(relid);
1673 
1674  if (!RelationIsValid(relation))
1675  elog(ERROR, "could not open relation with OID %u", relid);
1676 
1677  if (!RelationIsLogicallyLogged(relation))
1678  continue;
1679 
1680  relations[nrelations++] = relation;
1681  }
1682 
1683  rb->apply_truncate(rb, txn, nrelations, relations, change);
1684 
1685  for (i = 0; i < nrelations; i++)
1686  RelationClose(relations[i]);
1687 
1688  break;
1689  }
1690 
1692  rb->message(rb, txn, change->lsn, true,
1693  change->data.msg.prefix,
1694  change->data.msg.message_size,
1695  change->data.msg.message);
1696  break;
1697 
1699  /* get rid of the old */
1700  TeardownHistoricSnapshot(false);
1701 
1702  if (snapshot_now->copied)
1703  {
1704  ReorderBufferFreeSnap(rb, snapshot_now);
1705  snapshot_now =
1706  ReorderBufferCopySnap(rb, change->data.snapshot,
1707  txn, command_id);
1708  }
1709 
1710  /*
1711  * Restored from disk, need to be careful not to double
1712  * free. We could introduce refcounting for that, but for
1713  * now this seems infrequent enough not to care.
1714  */
1715  else if (change->data.snapshot->copied)
1716  {
1717  snapshot_now =
1718  ReorderBufferCopySnap(rb, change->data.snapshot,
1719  txn, command_id);
1720  }
1721  else
1722  {
1723  snapshot_now = change->data.snapshot;
1724  }
1725 
1726 
1727  /* and continue with the new one */
1728  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1729  break;
1730 
1732  Assert(change->data.command_id != InvalidCommandId);
1733 
1734  if (command_id < change->data.command_id)
1735  {
1736  command_id = change->data.command_id;
1737 
1738  if (!snapshot_now->copied)
1739  {
1740  /* we don't use the global one anymore */
1741  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1742  txn, command_id);
1743  }
1744 
1745  snapshot_now->curcid = command_id;
1746 
1747  TeardownHistoricSnapshot(false);
1748  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1749 
1750  /*
1751  * Every time the CommandId is incremented, we could
1752  * see new catalog contents, so execute all
1753  * invalidations.
1754  */
1756  }
1757 
1758  break;
1759 
1761  elog(ERROR, "tuplecid value in changequeue");
1762  break;
1763  }
1764  }
1765 
1766  /*
1767  * There's a speculative insertion remaining, just clean in up, it
1768  * can't have been successful, otherwise we'd gotten a confirmation
1769  * record.
1770  */
1771  if (specinsert)
1772  {
1773  ReorderBufferReturnChange(rb, specinsert);
1774  specinsert = NULL;
1775  }
1776 
1777  /* clean up the iterator */
1778  ReorderBufferIterTXNFinish(rb, iterstate);
1779  iterstate = NULL;
1780 
1781  /* call commit callback */
1782  rb->commit(rb, txn, commit_lsn);
1783 
1784  /* this is just a sanity check against bad output plugin behaviour */
1786  elog(ERROR, "output plugin used XID %u",
1788 
1789  /* cleanup */
1790  TeardownHistoricSnapshot(false);
1791 
1792  /*
1793  * Aborting the current (sub-)transaction as a whole has the right
1794  * semantics. We want all locks acquired in here to be released, not
1795  * reassigned to the parent and we do not want any database access
1796  * have persistent effects.
1797  */
1799 
1800  /* make sure there's no cache pollution */
1802 
1803  if (using_subtxn)
1805 
1806  if (snapshot_now->copied)
1807  ReorderBufferFreeSnap(rb, snapshot_now);
1808 
1809  /* remove potential on-disk data, and deallocate */
1810  ReorderBufferCleanupTXN(rb, txn);
1811  }
1812  PG_CATCH();
1813  {
1814  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1815  if (iterstate)
1816  ReorderBufferIterTXNFinish(rb, iterstate);
1817 
1819 
1820  /*
1821  * Force cache invalidation to happen outside of a valid transaction
1822  * to prevent catalog access as we just caught an error.
1823  */
1825 
1826  /* make sure there's no cache pollution */
1828 
1829  if (using_subtxn)
1831 
1832  if (snapshot_now->copied)
1833  ReorderBufferFreeSnap(rb, snapshot_now);
1834 
1835  /* remove potential on-disk data, and deallocate */
1836  ReorderBufferCleanupTXN(rb, txn);
1837 
1838  PG_RE_THROW();
1839  }
1840  PG_END_TRY();
1841 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
uint32 CommandId
Definition: c.h:521
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
TimestampTz commit_time
void AbortCurrentTransaction(void)
Definition: xact.c:3159
bool IsToastRelation(Relation relation)
Definition: catalog.c:142
#define relpathperm(rnode, forknum)
Definition: relpath.h:83
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
ReorderBufferApplyChangeCB apply_change
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
bool copied
Definition: snapshot.h:185
struct ReorderBufferChange::@101::@102 tp
struct ReorderBufferChange::@101::@104 msg
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4651
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2051
ReorderBufferCommitCB commit
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:591
Form_pg_class rd_rel
Definition: rel.h:83
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:78
XLogRecPtr origin_lsn
#define FirstCommandId
Definition: c.h:523
#define ERROR
Definition: elog.h:43
#define RelationIsValid(relation)
Definition: rel.h:392
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:423
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4462
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:440
#define InvalidTransactionId
Definition: transam.h:31
XLogRecPtr final_lsn
void RelationClose(Relation relation)
Definition: relcache.c:2089
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
ReorderBufferMessageCB message
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
void * palloc0(Size size)
Definition: mcxt.c:980
#define InvalidCommandId
Definition: c.h:524
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
#define InvalidOid
Definition: postgres_ext.h:36
CommandId curcid
Definition: snapshot.h:187
static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define PG_CATCH()
Definition: elog.h:310
#define Assert(condition)
Definition: c.h:732
union ReorderBufferChange::@101 data
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
XLogRecPtr end_lsn
void StartTransactionCommand(void)
Definition: xact.c:2794
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4357
#define PG_RE_THROW()
Definition: elog.h:331
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
ReorderBufferApplyTruncateCB apply_truncate
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2035
#define elog(elevel,...)
Definition: elog.h:226
int i
ReorderBufferBeginCB begin
#define PG_TRY()
Definition: elog.h:301
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
static ReorderBufferIterTXNState * ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:1990
#define PG_END_TRY()
Definition: elog.h:317
struct ReorderBufferChange::@101::@103 truncate

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 890 of file reorderbuffer.c.

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

893 {
894  ReorderBufferTXN *subtxn;
895 
896  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
897  InvalidXLogRecPtr, false);
898 
899  /*
900  * No need to do anything if that subtxn didn't contain any changes
901  */
902  if (!subtxn)
903  return;
904 
905  subtxn->final_lsn = commit_lsn;
906  subtxn->end_lsn = end_lsn;
907 
908  /*
909  * Assign this subxact as a child of the toplevel xact (no-op if already
910  * done.)
911  */
913 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr final_lsn
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
XLogRecPtr end_lsn
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferCopySnap()

static Snapshot ReorderBufferCopySnap ( ReorderBuffer rb,
Snapshot  orig_snap,
ReorderBufferTXN txn,
CommandId  cid 
)
static

Definition at line 1353 of file reorderbuffer.c.

References SnapshotData::active_count, ReorderBuffer::context, SnapshotData::copied, dlist_iter::cur, SnapshotData::curcid, dlist_container, dlist_foreach, i, MemoryContextAllocZero(), ReorderBufferTXN::nsubtxns, qsort, SnapshotData::regd_count, ReorderBufferTXN::subtxns, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::xcnt, ReorderBufferTXN::xid, xidComparator(), and SnapshotData::xip.

Referenced by ReorderBufferCommit().

1355 {
1356  Snapshot snap;
1357  dlist_iter iter;
1358  int i = 0;
1359  Size size;
1360 
1361  size = sizeof(SnapshotData) +
1362  sizeof(TransactionId) * orig_snap->xcnt +
1363  sizeof(TransactionId) * (txn->nsubtxns + 1);
1364 
1365  snap = MemoryContextAllocZero(rb->context, size);
1366  memcpy(snap, orig_snap, sizeof(SnapshotData));
1367 
1368  snap->copied = true;
1369  snap->active_count = 1; /* mark as active so nobody frees it */
1370  snap->regd_count = 0;
1371  snap->xip = (TransactionId *) (snap + 1);
1372 
1373  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1374 
1375  /*
1376  * snap->subxip contains all txids that belong to our transaction which we
1377  * need to check via cmin/cmax. That's why we store the toplevel
1378  * transaction in there as well.
1379  */
1380  snap->subxip = snap->xip + snap->xcnt;
1381  snap->subxip[i++] = txn->xid;
1382 
1383  /*
1384  * subxcnt isn't decreased when subtransactions abort, so count manually.
1385  * Since it's an upper boundary it is safe to use it for the allocation
1386  * above.
1387  */
1388  snap->subxcnt = 1;
1389 
1390  dlist_foreach(iter, &txn->subtxns)
1391  {
1392  ReorderBufferTXN *sub_txn;
1393 
1394  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1395  snap->subxip[i++] = sub_txn->xid;
1396  snap->subxcnt++;
1397  }
1398 
1399  /* sort so we can bsearch() later */
1400  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1401 
1402  /* store the specified current CommandId */
1403  snap->curcid = cid;
1404 
1405  return snap;
1406 }
uint32 TransactionId
Definition: c.h:507
bool copied
Definition: snapshot.h:185
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
uint32 regd_count
Definition: snapshot.h:199
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
struct SnapshotData SnapshotData
TransactionId * xip
Definition: snapshot.h:168
MemoryContext context
TransactionId xid
dlist_node * cur
Definition: ilist.h:161
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:839
CommandId curcid
Definition: snapshot.h:187
size_t Size
Definition: c.h:466
dlist_head subtxns
uint32 xcnt
Definition: snapshot.h:169
int i
#define qsort(a, b, c, d)
Definition: port.h:492
TransactionId * subxip
Definition: snapshot.h:180
uint32 active_count
Definition: snapshot.h:198
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:138
int32 subxcnt
Definition: snapshot.h:181

◆ ReorderBufferExecuteInvalidations()

static void ReorderBufferExecuteInvalidations ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2144 of file reorderbuffer.c.

References i, ReorderBufferTXN::invalidations, LocalExecuteInvalidationMessage(), and ReorderBufferTXN::ninvalidations.

Referenced by ReorderBufferCommit().

2145 {
2146  int i;
2147 
2148  for (i = 0; i < txn->ninvalidations; i++)
2150 }
SharedInvalidationMessage * invalidations
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:556
int i

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1940 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

1941 {
1942  ReorderBufferTXN *txn;
1943 
1944  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1945  false);
1946 
1947  /* unknown, nothing to forget */
1948  if (txn == NULL)
1949  return;
1950 
1951  /* cosmetic... */
1952  txn->final_lsn = lsn;
1953 
1954  /*
1955  * Process cache invalidation messages if there are any. Even if we're not
1956  * interested in the transaction's contents, it could have manipulated the
1957  * catalog and we need to update the caches according to that.
1958  */
1959  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
1961  txn->invalidations);
1962  else
1963  Assert(txn->ninvalidations == 0);
1964 
1965  /* remove potential on-disk data, and deallocate */
1966  ReorderBufferCleanupTXN(rb, txn);
1967 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:732
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer rb)

Definition at line 292 of file reorderbuffer.c.

References ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

293 {
294  MemoryContext context = rb->context;
295 
296  /*
297  * We free separately allocated data by entirely scrapping reorderbuffer's
298  * memory context.
299  */
300  MemoryContextDelete(context);
301 
302  /* Free disk space used by unconsumed reorder buffers */
304 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
ReplicationSlotPersistentData data
Definition: slot.h:132
MemoryContext context
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NameStr(name)
Definition: c.h:609
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)

◆ ReorderBufferFreeSnap()

static void ReorderBufferFreeSnap ( ReorderBuffer rb,
Snapshot  snap 
)
static

Definition at line 1412 of file reorderbuffer.c.

References SnapshotData::copied, pfree(), and SnapBuildSnapDecRefcount().

Referenced by ReorderBufferCommit(), and ReorderBufferReturnChange().

1413 {
1414  if (snap->copied)
1415  pfree(snap);
1416  else
1418 }
bool copied
Definition: snapshot.h:185
void pfree(void *pointer)
Definition: mcxt.c:1056
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:433

◆ ReorderBufferGetChange()

ReorderBufferChange* ReorderBufferGetChange ( ReorderBuffer rb)

Definition at line 360 of file reorderbuffer.c.

References ReorderBuffer::change_context, and MemoryContextAlloc().

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddNewTupleCids(), ReorderBufferAddSnapshot(), ReorderBufferQueueMessage(), and ReorderBufferRestoreChange().

361 {
362  ReorderBufferChange *change;
363 
364  change = (ReorderBufferChange *)
366 
367  memset(change, 0, sizeof(ReorderBufferChange));
368  return change;
369 }
MemoryContext change_context
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer rb)

Definition at line 715 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessRunningXacts().

716 {
717  ReorderBufferTXN *txn;
718 
719  AssertTXNLsnOrder(rb);
720 
722  return NULL;
723 
725 
728  return txn;
729 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
dlist_head toplevel_by_lsn
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:732
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289

◆ ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBuffer rb)

Definition at line 743 of file reorderbuffer.c.

References AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, dlist_head_element, dlist_is_empty(), InvalidTransactionId, ReorderBufferTXNByIdEnt::txn, ReorderBuffer::txns_by_base_snapshot_lsn, and SnapshotData::xmin.

Referenced by SnapBuildProcessRunningXacts().

744 {
745  ReorderBufferTXN *txn;
746 
747  AssertTXNLsnOrder(rb);
748 
750  return InvalidTransactionId;
751 
752  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
754  return txn->base_snapshot->xmin;
755 }
Snapshot base_snapshot
dlist_head txns_by_base_snapshot_lsn
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: snapshot.h:157
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289

◆ ReorderBufferGetRelids()

Oid* ReorderBufferGetRelids ( ReorderBuffer rb,
int  nrelids 
)

Definition at line 469 of file reorderbuffer.c.

References ReorderBuffer::context, and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

470 {
471  Oid *relids;
472  Size alloc_len;
473 
474  alloc_len = sizeof(Oid) * nrelids;
475 
476  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
477 
478  return relids;
479 }
unsigned int Oid
Definition: postgres_ext.h:31
MemoryContext context
size_t Size
Definition: c.h:466
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796

◆ ReorderBufferGetTupleBuf()

ReorderBufferTupleBuf* ReorderBufferGetTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)

Definition at line 433 of file reorderbuffer.c.

References ReorderBufferTupleBuf::alloc_tuple_size, MemoryContextAlloc(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, HeapTupleData::t_data, ReorderBuffer::tup_context, and ReorderBufferTupleBuf::tuple.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

434 {
435  ReorderBufferTupleBuf *tuple;
436  Size alloc_len;
437 
438  alloc_len = tuple_len + SizeofHeapTupleHeader;
439 
440  tuple = (ReorderBufferTupleBuf *)
442  sizeof(ReorderBufferTupleBuf) +
443  MAXIMUM_ALIGNOF + alloc_len);
444  tuple->alloc_tuple_size = alloc_len;
445  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
446 
447  return tuple;
448 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
HeapTupleHeader t_data
Definition: htup.h:68
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:36
HeapTupleData tuple
Definition: reorderbuffer.h:27
size_t Size
Definition: c.h:466
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
MemoryContext tup_context

◆ ReorderBufferGetTXN()

static ReorderBufferTXN * ReorderBufferGetTXN ( ReorderBuffer rb)
static

Definition at line 310 of file reorderbuffer.c.

References ReorderBufferTXN::changes, dlist_init(), MemoryContextAlloc(), ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferTXNByIdEnt::txn, and ReorderBuffer::txn_context.

Referenced by ReorderBufferTXNByXid().

311 {
312  ReorderBufferTXN *txn;
313 
314  txn = (ReorderBufferTXN *)
316 
317  memset(txn, 0, sizeof(ReorderBufferTXN));
318 
319  dlist_init(&txn->changes);
320  dlist_init(&txn->tuplecids);
321  dlist_init(&txn->subtxns);
322 
323  return txn;
324 }
dlist_head changes
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
dlist_head subtxns
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
MemoryContext txn_context
dlist_head tuplecids

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 1976 of file reorderbuffer.c.

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by DecodeStandbyOp(), and ReorderBufferForget().

1978 {
1979  bool use_subtxn = IsTransactionOrTransactionBlock();
1980  int i;
1981 
1982  if (use_subtxn)
1983  BeginInternalSubTransaction("replay");
1984 
1985  /*
1986  * Force invalidations to happen outside of a valid transaction - that way
1987  * entries will just be marked as invalid without accessing the catalog.
1988  * That's advantageous because we don't need to setup the full state
1989  * necessary for catalog access.
1990  */
1991  if (use_subtxn)
1993 
1994  for (i = 0; i < ninvalidations; i++)
1995  LocalExecuteInvalidationMessage(&invalidations[i]);
1996 
1997  if (use_subtxn)
1999 }
void AbortCurrentTransaction(void)
Definition: xact.c:3159
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4651
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4462
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4357
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:556
int i

◆ ReorderBufferIterCompare()

static int ReorderBufferIterCompare ( Datum  a,
Datum  b,
void *  arg 
)
static

Definition at line 932 of file reorderbuffer.c.

References DatumGetInt32, ReorderBufferIterTXNState::entries, and ReorderBufferIterTXNEntry::lsn.

Referenced by ReorderBufferIterTXNInit().

933 {
935  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
936  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
937 
938  if (pos_a < pos_b)
939  return 1;
940  else if (pos_a == pos_b)
941  return 0;
942  return -1;
943 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
#define DatumGetInt32(X)
Definition: postgres.h:472
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:298
void * arg

◆ ReorderBufferIterTXNFinish()

static void ReorderBufferIterTXNFinish ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1159 of file reorderbuffer.c.

References Assert, binaryheap_free(), CloseTransientFile(), dlist_container, dlist_is_empty(), dlist_pop_head_node(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::fd, ReorderBufferIterTXNState::heap, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, pfree(), and ReorderBufferReturnChange().

Referenced by ReorderBufferCommit().

1161 {
1162  int32 off;
1163 
1164  for (off = 0; off < state->nr_txns; off++)
1165  {
1166  if (state->entries[off].fd != -1)
1167  CloseTransientFile(state->entries[off].fd);
1168  }
1169 
1170  /* free memory we might have "leaked" in the last *Next call */
1171  if (!dlist_is_empty(&state->old_change))
1172  {
1173  ReorderBufferChange *change;
1174 
1175  change = dlist_container(ReorderBufferChange, node,
1176  dlist_pop_head_node(&state->old_change));
1177  ReorderBufferReturnChange(rb, change);
1178  Assert(dlist_is_empty(&state->old_change));
1179  }
1180 
1181  binaryheap_free(state->heap);
1182  pfree(state);
1183 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
signed int int32
Definition: c.h:346
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:1056
int CloseTransientFile(int fd)
Definition: fd.c:2432
#define Assert(condition)
Definition: c.h:732
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:69
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368

◆ ReorderBufferIterTXNInit()

static ReorderBufferIterTXNState * ReorderBufferIterTXNInit ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 950 of file reorderbuffer.c.

References binaryheap_add_unordered(), binaryheap_allocate(), binaryheap_build(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, ReorderBuffer::context, dlist_iter::cur, dlist_container, dlist_foreach, dlist_head_element, dlist_init(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::fd, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, MemoryContextAllocZero(), ReorderBufferTXN::nentries, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, ReorderBufferIterCompare(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferIterTXNEntry::segno, ReorderBufferTXN::serialized, ReorderBufferTXN::subtxns, ReorderBufferTXNByIdEnt::txn, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferCommit().

951 {
952  Size nr_txns = 0;
954  dlist_iter cur_txn_i;
955  int32 off;
956 
957  /*
958  * Calculate the size of our heap: one element for every transaction that
959  * contains changes. (Besides the transactions already in the reorder
960  * buffer, we count the one we were directly passed.)
961  */
962  if (txn->nentries > 0)
963  nr_txns++;
964 
965  dlist_foreach(cur_txn_i, &txn->subtxns)
966  {
967  ReorderBufferTXN *cur_txn;
968 
969  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
970 
971  if (cur_txn->nentries > 0)
972  nr_txns++;
973  }
974 
975  /*
976  * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no
977  * need to allocate/build a heap then.
978  */
979 
980  /* allocate iteration state */
981  state = (ReorderBufferIterTXNState *)
983  sizeof(ReorderBufferIterTXNState) +
984  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
985 
986  state->nr_txns = nr_txns;
987  dlist_init(&state->old_change);
988 
989  for (off = 0; off < state->nr_txns; off++)
990  {
991  state->entries[off].fd = -1;
992  state->entries[off].segno = 0;
993  }
994 
995  /* allocate heap */
996  state->heap = binaryheap_allocate(state->nr_txns,
998  state);
999 
1000  /*
1001  * Now insert items into the binary heap, in an unordered fashion. (We
1002  * will run a heap assembly step at the end; this is more efficient.)
1003  */
1004 
1005  off = 0;
1006 
1007  /* add toplevel transaction if it contains changes */
1008  if (txn->nentries > 0)
1009  {
1010  ReorderBufferChange *cur_change;
1011 
1012  if (txn->serialized)
1013  {
1014  /* serialize remaining changes */
1015  ReorderBufferSerializeTXN(rb, txn);
1016  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
1017  &state->entries[off].segno);
1018  }
1019 
1020  cur_change = dlist_head_element(ReorderBufferChange, node,
1021  &txn->changes);
1022 
1023  state->entries[off].lsn = cur_change->lsn;
1024  state->entries[off].change = cur_change;
1025  state->entries[off].txn = txn;
1026 
1028  }
1029 
1030  /* add subtransactions if they contain changes */
1031  dlist_foreach(cur_txn_i, &txn->subtxns)
1032  {
1033  ReorderBufferTXN *cur_txn;
1034 
1035  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1036 
1037  if (cur_txn->nentries > 0)
1038  {
1039  ReorderBufferChange *cur_change;
1040 
1041  if (cur_txn->serialized)
1042  {
1043  /* serialize remaining changes */
1044  ReorderBufferSerializeTXN(rb, cur_txn);
1045  ReorderBufferRestoreChanges(rb, cur_txn,
1046  &state->entries[off].fd,
1047  &state->entries[off].segno);
1048  }
1049  cur_change = dlist_head_element(ReorderBufferChange, node,
1050  &cur_txn->changes);
1051 
1052  state->entries[off].lsn = cur_change->lsn;
1053  state->entries[off].change = cur_change;
1054  state->entries[off].txn = cur_txn;
1055 
1057  }
1058  }
1059 
1060  /* assemble a valid binary heap */
1061  binaryheap_build(state->heap);
1062 
1063  return state;
1064 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
ReorderBufferTXN * txn
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, int *fd, XLogSegNo *segno)
void binaryheap_add_unordered(binaryheap *heap, Datum d)
Definition: binaryheap.c:110
signed int int32
Definition: c.h:346
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
ReorderBufferChange * change
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
MemoryContext context
dlist_node * cur
Definition: ilist.h:161
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:839
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:126
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
Definition: regguts.h:298
size_t Size
Definition: c.h:466
dlist_head subtxns
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:33
#define Int32GetDatum(X)
Definition: postgres.h:479

◆ ReorderBufferIterTXNNext()

static ReorderBufferChange * ReorderBufferIterTXNNext ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1073 of file reorderbuffer.c.

References Assert, binaryheap::bh_size, binaryheap_first(), binaryheap_remove_first(), binaryheap_replace_first(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, DatumGetInt32, DEBUG2, dlist_container, dlist_delete(), dlist_has_next(), dlist_head_element, dlist_is_empty(), dlist_next_node(), dlist_pop_head_node(), dlist_push_tail(), elog, ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::fd, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, next, ReorderBufferChange::node, ReorderBufferIterTXNState::old_change, ReorderBufferRestoreChanges(), ReorderBufferReturnChange(), ReorderBufferIterTXNEntry::segno, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferCommit().

1074 {
1075  ReorderBufferChange *change;
1077  int32 off;
1078 
1079  /* nothing there anymore */
1080  if (state->heap->bh_size == 0)
1081  return NULL;
1082 
1083  off = DatumGetInt32(binaryheap_first(state->heap));
1084  entry = &state->entries[off];
1085 
1086  /* free memory we might have "leaked" in the previous *Next call */
1087  if (!dlist_is_empty(&state->old_change))
1088  {
1089  change = dlist_container(ReorderBufferChange, node,
1090  dlist_pop_head_node(&state->old_change));
1091  ReorderBufferReturnChange(rb, change);
1092  Assert(dlist_is_empty(&state->old_change));
1093  }
1094 
1095  change = entry->change;
1096 
1097  /*
1098  * update heap with information about which transaction has the next
1099  * relevant change in LSN order
1100  */
1101 
1102  /* there are in-memory changes */
1103  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1104  {
1105  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1106  ReorderBufferChange *next_change =
1107  dlist_container(ReorderBufferChange, node, next);
1108 
1109  /* txn stays the same */
1110  state->entries[off].lsn = next_change->lsn;
1111  state->entries[off].change = next_change;
1112 
1114  return change;
1115  }
1116 
1117  /* try to load changes from disk */
1118  if (entry->txn->nentries != entry->txn->nentries_mem)
1119  {
1120  /*
1121  * Ugly: restoring changes will reuse *Change records, thus delete the
1122  * current one from the per-tx list and only free in the next call.
1123  */
1124  dlist_delete(&change->node);
1125  dlist_push_tail(&state->old_change, &change->node);
1126 
1127  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
1128  &state->entries[off].segno))
1129  {
1130  /* successfully restored changes from disk */
1131  ReorderBufferChange *next_change =
1133  &entry->txn->changes);
1134 
1135  elog(DEBUG2, "restored %u/%u changes from disk",
1136  (uint32) entry->txn->nentries_mem,
1137  (uint32) entry->txn->nentries);
1138 
1139  Assert(entry->txn->nentries_mem);
1140  /* txn stays the same */
1141  state->entries[off].lsn = next_change->lsn;
1142  state->entries[off].change = next_change;
1144 
1145  return change;
1146  }
1147  }
1148 
1149  /* ok, no changes there anymore, remove */
1150  binaryheap_remove_first(state->heap);
1151 
1152  return change;
1153 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
static int32 next
Definition: blutils.c:215
#define DatumGetInt32(X)
Definition: postgres.h:472
ReorderBufferTXN * txn
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void binaryheap_replace_first(binaryheap *heap, Datum d)
Definition: binaryheap.c:204
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, int *fd, XLogSegNo *segno)
signed int int32
Definition: c.h:346
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:421
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
Datum binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:159
#define DEBUG2
Definition: elog.h:24
unsigned int uint32
Definition: c.h:358
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
int bh_size
Definition: binaryheap.h:32
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
ReorderBufferChange * change
static bool dlist_has_next(dlist_head *head, dlist_node *node)
Definition: ilist.h:402
#define Assert(condition)
Definition: c.h:732
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define Int32GetDatum(X)
Definition: postgres.h:479
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
#define elog(elevel,...)
Definition: elog.h:226
Datum binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:174

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2012 of file reorderbuffer.c.

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by DecodeHeap2Op(), DecodeHeapOp(), DecodeLogicalMsgOp(), DecodeStandbyOp(), DecodeXactOp(), DecodeXLogOp(), and LogicalDecodingProcessRecord().

2013 {
2014  /* many records won't have an xid assigned, centralize check here */
2015  if (xid != InvalidTransactionId)
2016  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2017 }
#define InvalidTransactionId
Definition: transam.h:31
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change 
)

Definition at line 580 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBufferCheckSerializeTXN(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

582 {
583  ReorderBufferTXN *txn;
584 
585  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
586 
587  change->lsn = lsn;
588  Assert(InvalidXLogRecPtr != lsn);
589  dlist_push_tail(&txn->changes, &change->node);
590  txn->nentries++;
591  txn->nentries_mem++;
592 
594 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
dlist_head changes
#define Assert(condition)
Definition: c.h:732
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snapshot,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 600 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), TeardownHistoricSnapshot(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeLogicalMsgOp().

604 {
605  if (transactional)
606  {
607  MemoryContext oldcontext;
608  ReorderBufferChange *change;
609 
611 
612  oldcontext = MemoryContextSwitchTo(rb->context);
613 
614  change = ReorderBufferGetChange(rb);
616  change->data.msg.prefix = pstrdup(prefix);
617  change->data.msg.message_size = message_size;
618  change->data.msg.message = palloc(message_size);
619  memcpy(change->data.msg.message, message, message_size);
620 
621  ReorderBufferQueueChange(rb, xid, lsn, change);
622 
623  MemoryContextSwitchTo(oldcontext);
624  }
625  else
626  {
627  ReorderBufferTXN *txn = NULL;
628  volatile Snapshot snapshot_now = snapshot;
629 
630  if (xid != InvalidTransactionId)
631  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
632 
633  /* setup snapshot to allow catalog access */
634  SetupHistoricSnapshot(snapshot_now, NULL);
635  PG_TRY();
636  {
637  rb->message(rb, txn, lsn, false, prefix, message_size, message);
638 
640  }
641  PG_CATCH();
642  {
644  PG_RE_THROW();
645  }
646  PG_END_TRY();
647  }
648 }
struct ReorderBufferChange::@101::@104 msg
char * pstrdup(const char *in)
Definition: mcxt.c:1186
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2051
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:78
#define InvalidTransactionId
Definition: transam.h:31
ReorderBufferMessageCB message
MemoryContext context
#define PG_CATCH()
Definition: elog.h:310
#define Assert(condition)
Definition: c.h:732
union ReorderBufferChange::@101 data
#define PG_RE_THROW()
Definition: elog.h:331
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:949
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2035
#define PG_TRY()
Definition: elog.h:301
#define PG_END_TRY()
Definition: elog.h:317

◆ ReorderBufferRestoreChange()

static void ReorderBufferRestoreChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
char *  change 
)
static

Definition at line 2650 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, ReorderBufferTXN::changes, ReorderBuffer::context, ReorderBufferChange::data, dlist_push_tail(), MemoryContextAlloc(), MemoryContextAllocZero(), ReorderBufferChange::msg, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, offsetof, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferGetChange(), ReorderBufferGetRelids(), ReorderBufferGetTupleBuf(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, ReorderBufferChange::snapshot, SnapshotData::subxcnt, ReorderBufferChange::tp, ReorderBufferChange::truncate, SnapshotData::xcnt, and SnapshotData::xip.

Referenced by ReorderBufferRestoreChanges().

2652 {
2653  ReorderBufferDiskChange *ondisk;
2654  ReorderBufferChange *change;
2655 
2656  ondisk = (ReorderBufferDiskChange *) data;
2657 
2658  change = ReorderBufferGetChange(rb);
2659 
2660  /* copy static part */
2661  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
2662 
2663  data += sizeof(ReorderBufferDiskChange);
2664 
2665  /* restore individual stuff */
2666  switch (change->action)
2667  {
2668  /* fall through these, they're all similar enough */
2673  if (change->data.tp.oldtuple)
2674  {
2675  uint32 tuplelen = ((HeapTuple) data)->t_len;
2676 
2677  change->data.tp.oldtuple =
2679 
2680  /* restore ->tuple */
2681  memcpy(&change->data.tp.oldtuple->tuple, data,
2682  sizeof(HeapTupleData));
2683  data += sizeof(HeapTupleData);
2684 
2685  /* reset t_data pointer into the new tuplebuf */
2686  change->data.tp.oldtuple->tuple.t_data =
2687  ReorderBufferTupleBufData(change->data.tp.oldtuple);
2688 
2689  /* restore tuple data itself */
2690  memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
2691  data += tuplelen;
2692  }
2693 
2694  if (change->data.tp.newtuple)
2695  {
2696  /* here, data might not be suitably aligned! */
2697  uint32 tuplelen;
2698 
2699  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
2700  sizeof(uint32));
2701 
2702  change->data.tp.newtuple =
2704 
2705  /* restore ->tuple */
2706  memcpy(&change->data.tp.newtuple->tuple, data,
2707  sizeof(HeapTupleData));
2708  data += sizeof(HeapTupleData);
2709 
2710  /* reset t_data pointer into the new tuplebuf */
2711  change->data.tp.newtuple->tuple.t_data =
2712  ReorderBufferTupleBufData(change->data.tp.newtuple);
2713 
2714  /* restore tuple data itself */
2715  memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
2716  data += tuplelen;
2717  }
2718 
2719  break;
2721  {
2722  Size prefix_size;
2723 
2724  /* read prefix */
2725  memcpy(&prefix_size, data, sizeof(Size));
2726  data += sizeof(Size);
2727  change->data.msg.prefix = MemoryContextAlloc(rb->context,
2728  prefix_size);
2729  memcpy(change->data.msg.prefix, data, prefix_size);
2730  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
2731  data += prefix_size;
2732 
2733  /* read the message */
2734  memcpy(&change->data.msg.message_size, data, sizeof(Size));
2735  data += sizeof(Size);
2736  change->data.msg.message = MemoryContextAlloc(rb->context,
2737  change->data.msg.message_size);
2738  memcpy(change->data.msg.message, data,
2739  change->data.msg.message_size);
2740  data += change->data.msg.message_size;
2741 
2742  break;
2743  }
2745  {
2746  Snapshot oldsnap;
2747  Snapshot newsnap;
2748  Size size;
2749 
2750  oldsnap = (Snapshot) data;
2751 
2752  size = sizeof(SnapshotData) +
2753  sizeof(TransactionId) * oldsnap->xcnt +
2754  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
2755 
2756  change->data.snapshot = MemoryContextAllocZero(rb->context, size);
2757 
2758  newsnap = change->data.snapshot;
2759 
2760  memcpy(newsnap, data, size);
2761  newsnap->xip = (TransactionId *)
2762  (((char *) newsnap) + sizeof(SnapshotData));
2763  newsnap->subxip = newsnap->xip + newsnap->xcnt;
2764  newsnap->copied = true;
2765  break;
2766  }
2767  /* the base struct contains all the data, easy peasy */
2769  {
2770  Oid *relids;
2771 
2772  relids = ReorderBufferGetRelids(rb,
2773  change->data.truncate.nrelids);
2774  memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
2775  change->data.truncate.relids = relids;
2776 
2777  break;
2778  }
2782  break;
2783  }
2784 
2785  dlist_push_tail(&txn->changes, &change->node);
2786  txn->nentries_mem++;
2787 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
HeapTupleData * HeapTuple
Definition: htup.h:71
uint32 TransactionId
Definition: c.h:507
struct ReorderBufferChange::@101::@102 tp
struct ReorderBufferChange::@101::@104 msg
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
struct SnapshotData * Snapshot
Definition: snapshot.h:121
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:78
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:36
dlist_head changes
struct SnapshotData SnapshotData
unsigned int uint32
Definition: c.h:358
TransactionId * xip
Definition: snapshot.h:168
MemoryContext context
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:839
struct ReorderBufferDiskChange ReorderBufferDiskChange
#define Assert(condition)
Definition: c.h:732
union ReorderBufferChange::@101 data
size_t Size
Definition: c.h:466
uint32 xcnt
Definition: snapshot.h:169
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
Oid * ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
ReorderBufferChange change
struct HeapTupleData HeapTupleData
#define offsetof(type, field)
Definition: c.h:655
struct ReorderBufferChange::@101::@103 truncate
int32 subxcnt
Definition: snapshot.h:181
ReorderBufferTupleBuf * ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)

◆ ReorderBufferRestoreChanges()

static Size ReorderBufferRestoreChanges ( ReorderBuffer rb,
ReorderBufferTXN txn,
int *  fd,
XLogSegNo segno 
)
static

Definition at line 2519 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, cleanup(), CloseTransientFile(), dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, max_changes_in_memory, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, OpenTransientFile(), ReorderBuffer::outbuf, PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferSerializedPath(), ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, WAIT_EVENT_REORDER_BUFFER_READ, wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferIterTXNInit(), and ReorderBufferIterTXNNext().

2521 {
2522  Size restored = 0;
2523  XLogSegNo last_segno;
2524  dlist_mutable_iter cleanup_iter;
2525 
2528 
2529  /* free current entries, so we have memory for more */
2530  dlist_foreach_modify(cleanup_iter, &txn->changes)
2531  {
2533  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
2534 
2535  dlist_delete(&cleanup->node);
2536  ReorderBufferReturnChange(rb, cleanup);
2537  }
2538  txn->nentries_mem = 0;
2539  Assert(dlist_is_empty(&txn->changes));
2540 
2541  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
2542 
2543  while (restored < max_changes_in_memory && *segno <= last_segno)
2544  {
2545  int readBytes;
2546  ReorderBufferDiskChange *ondisk;
2547 
2548  if (*fd == -1)
2549  {
2550  char path[MAXPGPATH];
2551 
2552  /* first time in */
2553  if (*segno == 0)
2554  XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
2555 
2556  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2557 
2558  /*
2559  * No need to care about TLIs here, only used during a single run,
2560  * so each LSN only maps to a specific WAL record.
2561  */
2563  *segno);
2564 
2565  *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
2566  if (*fd < 0 && errno == ENOENT)
2567  {
2568  *fd = -1;
2569  (*segno)++;
2570  continue;
2571  }
2572  else if (*fd < 0)
2573  ereport(ERROR,
2575  errmsg("could not open file \"%s\": %m",
2576  path)));
2577  }
2578 
2579  /*
2580  * Read the statically sized part of a change which has information
2581  * about the total size. If we couldn't read a record, we're at the
2582  * end of this file.
2583  */
2586  readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
2588 
2589  /* eof */
2590  if (readBytes == 0)
2591  {
2593  *fd = -1;
2594  (*segno)++;
2595  continue;
2596  }
2597  else if (readBytes < 0)
2598  ereport(ERROR,
2600  errmsg("could not read from reorderbuffer spill file: %m")));
2601  else if (readBytes != sizeof(ReorderBufferDiskChange))
2602  ereport(ERROR,
2604  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2605  readBytes,
2606  (uint32) sizeof(ReorderBufferDiskChange))));
2607 
2608  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2609 
2611  sizeof(ReorderBufferDiskChange) + ondisk->size);
2612  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2613 
2615  readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
2616  ondisk->size - sizeof(ReorderBufferDiskChange));
2618 
2619  if (readBytes < 0)
2620  ereport(ERROR,
2622  errmsg("could not read from reorderbuffer spill file: %m")));
2623  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
2624  ereport(ERROR,
2626  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2627  readBytes,
2628  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
2629 
2630  /*
2631  * ok, read a full change from disk, now restore it into proper
2632  * in-memory format
2633  */
2634  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
2635  restored++;
2636  }
2637 
2638  return restored;
2639 }
XLogRecPtr first_lsn
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
dlist_node * cur
Definition: ilist.h:180
int wal_segment_size
Definition: xlog.c:112
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1191
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2255
dlist_head changes
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:41
int errcode_for_file_access(void)
Definition: elog.c:593
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
unsigned int uint32
Definition: c.h:358
XLogRecPtr final_lsn
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1342
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define ereport(elevel, rest)
Definition: elog.h:141
int CloseTransientFile(int fd)
Definition: fd.c:2432
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
static void cleanup(void)
Definition: bootstrap.c:901
TransactionId xid
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:732
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
size_t Size
Definition: c.h:466
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1318
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
int errmsg(const char *fmt,...)
Definition: elog.c:784
static const Size max_changes_in_memory
#define read(a, b, c)
Definition: win32.h:13
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ ReorderBufferRestoreCleanup()

static void ReorderBufferRestoreCleanup ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2793 of file reorderbuffer.c.

References Assert, cur, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, MAXPGPATH, MyReplicationSlot, ReorderBufferSerializedPath(), wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferCleanupTXN().

2794 {
2795  XLogSegNo first;
2796  XLogSegNo cur;
2797  XLogSegNo last;
2798 
2801 
2802  XLByteToSeg(txn->first_lsn, first, wal_segment_size);
2803  XLByteToSeg(txn->final_lsn, last, wal_segment_size);
2804 
2805  /* iterate over all possible filenames, and delete them */
2806  for (cur = first; cur <= last; cur++)
2807  {
2808  char path[MAXPGPATH];
2809 
2811  if (unlink(path) != 0 && errno != ENOENT)
2812  ereport(ERROR,
2814  errmsg("could not remove file \"%s\": %m", path)));
2815  }
2816 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int wal_segment_size
Definition: xlog.c:112
struct cursor * cur
Definition: ecpg.c:28
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:41
int errcode_for_file_access(void)
Definition: elog.c:593
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
XLogRecPtr final_lsn
#define ereport(elevel, rest)
Definition: elog.h:141
TransactionId xid
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:732
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ ReorderBufferReturnChange()

void ReorderBufferReturnChange ( ReorderBuffer rb,
ReorderBufferChange change 
)

Definition at line 375 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::msg, pfree(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferFreeSnap(), ReorderBufferReturnRelids(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, ReorderBufferChange::tp, and ReorderBufferChange::truncate.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferCommit(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), and ReorderBufferToastReset().

376 {
377  /* free contained data */
378  switch (change->action)
379  {
384  if (change->data.tp.newtuple)
385  {
386  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
387  change->data.tp.newtuple = NULL;
388  }
389 
390  if (change->data.tp.oldtuple)
391  {
392  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
393  change->data.tp.oldtuple = NULL;
394  }
395  break;
397  if (change->data.msg.prefix != NULL)
398  pfree(change->data.msg.prefix);
399  change->data.msg.prefix = NULL;
400  if (change->data.msg.message != NULL)
401  pfree(change->data.msg.message);
402  change->data.msg.message = NULL;
403  break;
405  if (change->data.snapshot)
406  {
407  ReorderBufferFreeSnap(rb, change->data.snapshot);
408  change->data.snapshot = NULL;
409  }
410  break;
411  /* no data in addition to the struct itself */
413  if (change->data.truncate.relids != NULL)
414  {
415  ReorderBufferReturnRelids(rb, change->data.truncate.relids);
416  change->data.truncate.relids = NULL;
417  }
418  break;
422  break;
423  }
424 
425  pfree(change);
426 }
struct ReorderBufferChange::@101::@102 tp
struct ReorderBufferChange::@101::@104 msg
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:78
void pfree(void *pointer)
Definition: mcxt.c:1056
void ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)
union ReorderBufferChange::@101 data
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
struct ReorderBufferChange::@101::@103 truncate

◆ ReorderBufferReturnRelids()

void ReorderBufferReturnRelids ( ReorderBuffer rb,
Oid relids 
)

Definition at line 485 of file reorderbuffer.c.

References pfree().

Referenced by ReorderBufferReturnChange().

486 {
487  pfree(relids);
488 }
void pfree(void *pointer)
Definition: mcxt.c:1056

◆ ReorderBufferReturnTupleBuf()

void ReorderBufferReturnTupleBuf ( ReorderBuffer rb,
ReorderBufferTupleBuf tuple 
)

Definition at line 454 of file reorderbuffer.c.

References pfree().

Referenced by ReorderBufferReturnChange().

455 {
456  pfree(tuple);
457 }
void pfree(void *pointer)
Definition: mcxt.c:1056

◆ ReorderBufferReturnTXN()

static void ReorderBufferReturnTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 330 of file reorderbuffer.c.

References ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, hash_destroy(), ReorderBufferTXN::invalidations, InvalidTransactionId, pfree(), ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::xid.

Referenced by ReorderBufferCleanupTXN().

331 {
332  /* clean the lookup cache if we were cached (quite likely) */
333  if (rb->by_txn_last_xid == txn->xid)
334  {
336  rb->by_txn_last_txn = NULL;
337  }
338 
339  /* free data that's contained */
340 
341  if (txn->tuplecid_hash != NULL)
342  {
344  txn->tuplecid_hash = NULL;
345  }
346 
347  if (txn->invalidations)
348  {
349  pfree(txn->invalidations);
350  txn->invalidations = NULL;
351  }
352 
353  pfree(txn);
354 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:814
TransactionId by_txn_last_xid
void pfree(void *pointer)
Definition: mcxt.c:1056
#define InvalidTransactionId
Definition: transam.h:31
ReorderBufferTXN * by_txn_last_txn
TransactionId xid
SharedInvalidationMessage * invalidations

◆ ReorderBufferSerializeChange()

static void ReorderBufferSerializeChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  fd,
ReorderBufferChange change 
)
static

Definition at line 2331 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, CloseTransientFile(), ReorderBufferChange::data, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferChange::msg, ReorderBuffer::outbuf, pgstat_report_wait_end(), pgstat_report_wait_start(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTupleBuf::tuple, WAIT_EVENT_REORDER_BUFFER_WRITE, write, SnapshotData::xcnt, ReorderBufferTXN::xid, and SnapshotData::xip.

Referenced by ReorderBufferSerializeTXN().

2333 {
2334  ReorderBufferDiskChange *ondisk;
2335  Size sz = sizeof(ReorderBufferDiskChange);
2336 
2338 
2339  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2340  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
2341 
2342  switch (change->action)
2343  {
2344  /* fall through these, they're all similar enough */
2349  {
2350  char *data;
2351  ReorderBufferTupleBuf *oldtup,
2352  *newtup;
2353  Size oldlen = 0;
2354  Size newlen = 0;
2355 
2356  oldtup = change->data.tp.oldtuple;
2357  newtup = change->data.tp.newtuple;
2358 
2359  if (oldtup)
2360  {
2361  sz += sizeof(HeapTupleData);
2362  oldlen = oldtup->tuple.t_len;
2363  sz += oldlen;
2364  }
2365 
2366  if (newtup)
2367  {
2368  sz += sizeof(HeapTupleData);
2369  newlen = newtup->tuple.t_len;
2370  sz += newlen;
2371  }
2372 
2373  /* make sure we have enough space */
2375 
2376  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2377  /* might have been reallocated above */
2378  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2379 
2380  if (oldlen)
2381  {
2382  memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
2383  data += sizeof(HeapTupleData);
2384 
2385  memcpy(data, oldtup->tuple.t_data, oldlen);
2386  data += oldlen;
2387  }
2388 
2389  if (newlen)
2390  {
2391  memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
2392  data += sizeof(HeapTupleData);
2393 
2394  memcpy(data, newtup->tuple.t_data, newlen);
2395  data += newlen;
2396  }
2397  break;
2398  }
2400  {
2401  char *data;
2402  Size prefix_size = strlen(change->data.msg.prefix) + 1;
2403 
2404  sz += prefix_size + change->data.msg.message_size +
2405  sizeof(Size) + sizeof(Size);
2407 
2408  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2409 
2410  /* might have been reallocated above */
2411  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2412 
2413  /* write the prefix including the size */
2414  memcpy(data, &prefix_size, sizeof(Size));
2415  data += sizeof(Size);
2416  memcpy(data, change->data.msg.prefix,
2417  prefix_size);
2418  data += prefix_size;
2419 
2420  /* write the message including the size */
2421  memcpy(data, &change->data.msg.message_size, sizeof(Size));
2422  data += sizeof(Size);
2423  memcpy(data, change->data.msg.message,
2424  change->data.msg.message_size);
2425  data += change->data.msg.message_size;
2426 
2427  break;
2428  }
2430  {
2431  Snapshot snap;
2432  char *data;
2433 
2434  snap = change->data.snapshot;
2435 
2436  sz += sizeof(SnapshotData) +
2437  sizeof(TransactionId) * snap->xcnt +
2438  sizeof(TransactionId) * snap->subxcnt
2439  ;
2440 
2441  /* make sure we have enough space */
2443  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2444  /* might have been reallocated above */
2445  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2446 
2447  memcpy(data, snap, sizeof(SnapshotData));
2448  data += sizeof(SnapshotData);
2449 
2450  if (snap->xcnt)
2451  {
2452  memcpy(data, snap->xip,
2453  sizeof(TransactionId) * snap->xcnt);
2454  data += sizeof(TransactionId) * snap->xcnt;
2455  }
2456 
2457  if (snap->subxcnt)
2458  {
2459  memcpy(data, snap->subxip,
2460  sizeof(TransactionId) * snap->subxcnt);
2461  data += sizeof(TransactionId) * snap->subxcnt;
2462  }
2463  break;
2464  }
2466  {
2467  Size size;
2468  char *data;
2469 
2470  /* account for the OIDs of truncated relations */
2471  size = sizeof(Oid) * change->data.truncate.nrelids;
2472  sz += size;
2473 
2474  /* make sure we have enough space */
2476 
2477  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2478  /* might have been reallocated above */
2479  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2480 
2481  memcpy(data, change->data.truncate.relids, size);
2482  data += size;
2483 
2484  break;
2485  }
2489  /* ReorderBufferChange contains everything important */
2490  break;
2491  }
2492 
2493  ondisk->size = sz;
2494 
2495  errno = 0;
2497  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
2498  {
2499  int save_errno = errno;
2500 
2502 
2503  /* if write didn't set errno, assume problem is no disk space */
2504  errno = save_errno ? save_errno : ENOSPC;
2505  ereport(ERROR,
2507  errmsg("could not write to data file for XID %u: %m",
2508  txn->xid)));
2509  }
2511 
2512  Assert(ondisk->change.action == change->action);
2513 }
uint32 TransactionId
Definition: c.h:507
struct ReorderBufferChange::@101::@102 tp
#define write(a, b, c)
Definition: win32.h:14
struct ReorderBufferChange::@101::@104 msg
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:78
static int fd(const char *x, int i)
Definition: preproc-init.c:105
HeapTupleHeader t_data
Definition: htup.h:68
#define ERROR
Definition: elog.h:43
uint32 t_len
Definition: htup.h:64
int errcode_for_file_access(void)
Definition: elog.c:593
HeapTupleData tuple
Definition: reorderbuffer.h:27
struct SnapshotData SnapshotData
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1342
#define ereport(elevel, rest)
Definition: elog.h:141
TransactionId * xip
Definition: snapshot.h:168
int CloseTransientFile(int fd)
Definition: fd.c:2432
TransactionId xid
struct ReorderBufferDiskChange ReorderBufferDiskChange
#define Assert(condition)
Definition: c.h:732
union ReorderBufferChange::@101 data
size_t Size
Definition: c.h:466
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1318
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
uint32 xcnt
Definition: snapshot.h:169
int errmsg(const char *fmt,...)
Definition: elog.c:784
ReorderBufferChange change
struct HeapTupleData HeapTupleData
TransactionId * subxip
Definition: snapshot.h:180
struct ReorderBufferChange::@101::@103 truncate
int32 subxcnt
Definition: snapshot.h:181

◆ ReorderBufferSerializedPath()

static void ReorderBufferSerializedPath ( char *  path,
ReplicationSlot slot,
TransactionId  xid,
XLogSegNo  segno 
)
static

Definition at line 2862 of file reorderbuffer.c.

References ReplicationSlot::data, MAXPGPATH, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, snprintf, wal_segment_size, and XLogSegNoOffsetToRecPtr.

Referenced by ReorderBufferRestoreChanges(), ReorderBufferRestoreCleanup(), and ReorderBufferSerializeTXN().

2864 {
2865  XLogRecPtr recptr;
2866 
2867  XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
2868 
2869  snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.spill",
2871  xid,
2872  (uint32) (recptr >> 32), (uint32) recptr);
2873 }
int wal_segment_size
Definition: xlog.c:112
ReplicationSlotPersistentData data
Definition: slot.h:132
#define MAXPGPATH
unsigned int uint32
Definition: c.h:358
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define NameStr(name)
Definition: c.h:609
#define snprintf
Definition: port.h:192

◆ ReorderBufferSerializeReserve()

static void ReorderBufferSerializeReserve ( ReorderBuffer rb,
Size  sz 
)
static

Definition at line 2218 of file reorderbuffer.c.

References ReorderBuffer::context, MemoryContextAlloc(), ReorderBuffer::outbuf, ReorderBuffer::outbufsize, and repalloc().

Referenced by ReorderBufferRestoreChanges(), and ReorderBufferSerializeChange().

2219 {
2220  if (!rb->outbufsize)
2221  {
2222  rb->outbuf = MemoryContextAlloc(rb->context, sz);
2223  rb->outbufsize = sz;
2224  }
2225  else if (rb->outbufsize < sz)
2226  {
2227  rb->outbuf = repalloc(rb->outbuf, sz);
2228  rb->outbufsize = sz;
2229  }
2230 }
MemoryContext context
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1069
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796

◆ ReorderBufferSerializeTXN()

static void ReorderBufferSerializeTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2253 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, CloseTransientFile(), dlist_iter::cur, dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_delete(), dlist_foreach, dlist_foreach_modify, dlist_is_empty(), elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferChange::lsn, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, OpenTransientFile(), PG_BINARY, ReorderBufferReturnChange(), ReorderBufferSerializeChange(), ReorderBufferSerializedPath(), ReorderBufferTXN::serialized, ReorderBufferTXN::subtxns, wal_segment_size, ReorderBufferTXN::xid, XLByteInSeg, and XLByteToSeg.

Referenced by ReorderBufferCheckSerializeTXN(), and ReorderBufferIterTXNInit().

2254 {
2255  dlist_iter subtxn_i;
2256  dlist_mutable_iter change_i;
2257  int fd = -1;
2258  XLogSegNo curOpenSegNo = 0;
2259  Size spilled = 0;
2260 
2261  elog(DEBUG2, "spill %u changes in XID %u to disk",
2262  (uint32) txn->nentries_mem, txn->xid);
2263 
2264  /* do the same to all child TXs */
2265  dlist_foreach(subtxn_i, &txn->subtxns)
2266  {
2267  ReorderBufferTXN *subtxn;
2268 
2269  subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
2270  ReorderBufferSerializeTXN(rb, subtxn);
2271  }
2272 
2273  /* serialize changestream */
2274  dlist_foreach_modify(change_i, &txn->changes)
2275  {
2276  ReorderBufferChange *change;
2277 
2278  change = dlist_container(ReorderBufferChange, node, change_i.cur);
2279 
2280  /*
2281  * store in segment in which it belongs by start lsn, don't split over
2282  * multiple segments tho
2283  */
2284  if (fd == -1 ||
2285  !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
2286  {
2287  char path[MAXPGPATH];
2288 
2289  if (fd != -1)
2290  CloseTransientFile(fd);
2291 
2292  XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
2293 
2294  /*
2295  * No need to care about TLIs here, only used during a single run,
2296  * so each LSN only maps to a specific WAL record.
2297  */
2299  curOpenSegNo);
2300 
2301  /* open segment, create it if necessary */
2302  fd = OpenTransientFile(path,
2303  O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
2304 
2305  if (fd < 0)
2306  ereport(ERROR,
2308  errmsg("could not open file \"%s\": %m", path)));
2309  }
2310 
2311  ReorderBufferSerializeChange(rb, txn, fd, change);
2312  dlist_delete(&change->node);
2313  ReorderBufferReturnChange(rb, change);
2314 
2315  spilled++;
2316  }
2317 
2318  Assert(spilled == txn->nentries_mem);
2319  Assert(dlist_is_empty(&txn->changes));
2320  txn->nentries_mem = 0;
2321  txn->serialized = true;
2322 
2323  if (fd != -1)
2324  CloseTransientFile(fd);
2325 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
dlist_node * cur
Definition: ilist.h:180
int wal_segment_size
Definition: xlog.c:112
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1191
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2255
dlist_head changes
#define MAXPGPATH
#define DEBUG2
Definition: elog.h:24
uint64 XLogSegNo
Definition: xlogdefs.h:41
int errcode_for_file_access(void)
Definition: elog.c:593
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
unsigned int uint32
Definition: c.h:358
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define ereport(elevel, rest)
Definition: elog.h:141
int CloseTransientFile(int fd)
Definition: fd.c:2432
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
dlist_node * cur
Definition: ilist.h:161
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:732
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
size_t Size
Definition: c.h:466
dlist_head subtxns
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define elog(elevel,...)
Definition: elog.h:226
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ ReorderBufferSetBaseSnapshot()

void ReorderBufferSetBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 2043 of file reorderbuffer.c.

References Assert, AssertArg, AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBufferTXN::base_snapshot_node, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXNByXid(), ReorderBufferTXN::toplevel_xid, ReorderBufferTXNByIdEnt::txn, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by SnapBuildCommitTxn(), and SnapBuildProcessChange().

2045 {
2046  ReorderBufferTXN *txn;
2047  bool is_new;
2048 
2049  AssertArg(snap != NULL);
2050 
2051  /*
2052  * Fetch the transaction to operate on. If we know it's a subtransaction,
2053  * operate on its top-level transaction instead.
2054  */
2055  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
2056  if (txn->is_known_as_subxact)
2057  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
2058  NULL, InvalidXLogRecPtr, false);
2059  Assert(txn->base_snapshot == NULL);
2060 
2061  txn->base_snapshot = snap;
2062  txn->base_snapshot_lsn = lsn;
2064 
2065  AssertTXNLsnOrder(rb);
2066 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
dlist_node base_snapshot_node
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
XLogRecPtr base_snapshot_lsn
dlist_head txns_by_base_snapshot_lsn
#define AssertArg(condition)
Definition: c.h:734
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:732
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
TransactionId toplevel_xid

◆ ReorderBufferSetRestartPoint()

void ReorderBufferSetRestartPoint ( ReorderBuffer rb,
XLogRecPtr  ptr 
)

Definition at line 758 of file reorderbuffer.c.

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

759 {
761 }
XLogRecPtr current_restart_decoding_lsn

◆ ReorderBufferToastAppendChunk()

static void ReorderBufferToastAppendChunk ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 2935 of file reorderbuffer.c.

References Assert, ReorderBufferToastEnt::chunk_id, ReorderBufferToastEnt::chunks, ReorderBufferChange::data, DatumGetInt32, DatumGetObjectId, DatumGetPointer, dlist_init(), dlist_push_tail(), elog, ERROR, fastgetattr, HASH_ENTER, hash_search(), IsToastRelation(), ReorderBufferToastEnt::last_chunk_seq, ReorderBufferChange::node, ReorderBufferToastEnt::num_chunks, ReorderBufferToastEnt::reconstructed, RelationGetDescr, ReorderBufferToastInitHash(), ReorderBufferToastEnt::size, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, ReorderBufferTupleBuf::tuple, VARATT_IS_EXTENDED, VARATT_IS_SHORT, VARHDRSZ, VARHDRSZ_SHORT, VARSIZE, and VARSIZE_SHORT.

Referenced by ReorderBufferCommit().

2937 {
2938  ReorderBufferToastEnt *ent;
2939  ReorderBufferTupleBuf *newtup;
2940  bool found;
2941  int32 chunksize;
2942  bool isnull;
2943  Pointer chunk;
2944  TupleDesc desc = RelationGetDescr(relation);
2945  Oid chunk_id;
2946  int32 chunk_seq;
2947 
2948  if (txn->toast_hash == NULL)
2949  ReorderBufferToastInitHash(rb, txn);
2950 
2951  Assert(IsToastRelation(relation));
2952 
2953  newtup = change->data.tp.newtuple;
2954  chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
2955  Assert(!isnull);
2956  chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
2957  Assert(!isnull);
2958 
2959  ent = (ReorderBufferToastEnt *)
2960  hash_search(txn->toast_hash,
2961  (void *) &chunk_id,
2962  HASH_ENTER,
2963  &found);
2964 
2965  if (!found)
2966  {
2967  Assert(ent->chunk_id == chunk_id);
2968  ent->num_chunks = 0;
2969  ent->last_chunk_seq = 0;
2970  ent->size = 0;
2971  ent->reconstructed = NULL;
2972  dlist_init(&ent->chunks);
2973 
2974  if (chunk_seq != 0)
2975  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
2976  chunk_seq, chunk_id);
2977  }
2978  else if (found && chunk_seq != ent->last_chunk_seq + 1)
2979  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
2980  chunk_seq, chunk_id, ent->last_chunk_seq + 1);
2981 
2982  chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
2983  Assert(!isnull);
2984 
2985  /* calculate size so we can allocate the right size at once later */
2986  if (!VARATT_IS_EXTENDED(chunk))
2987  chunksize = VARSIZE(chunk) - VARHDRSZ;
2988  else if (VARATT_IS_SHORT(chunk))
2989  /* could happen due to heap_form_tuple doing its thing */
2990  chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
2991  else
2992  elog(ERROR, "unexpected type of toast chunk");
2993 
2994  ent->size += chunksize;
2995  ent->last_chunk_seq = chunk_seq;
2996  ent->num_chunks++;
2997  dlist_push_tail(&ent->chunks, &change->node);
2998 }
bool IsToastRelation(Relation relation)
Definition: catalog.c:142
#define fastgetattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:712
#define DatumGetInt32(X)
Definition: postgres.h:472
#define RelationGetDescr(relation)
Definition: rel.h:445
struct ReorderBufferChange::@101::@102 tp
#define VARHDRSZ_SHORT
Definition: postgres.h:268
#define VARSIZE(PTR)
Definition: postgres.h:303
#define VARHDRSZ
Definition: c.h:555
#define DatumGetObjectId(X)
Definition: postgres.h:500
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
unsigned int Oid
Definition: postgres_ext.h:31
signed int int32
Definition: c.h:346
char * Pointer
Definition: c.h:335
#define ERROR
Definition: elog.h:43
#define VARATT_IS_SHORT(PTR)
Definition: postgres.h:326
struct varlena * reconstructed
HeapTupleData tuple
Definition: reorderbuffer.h:27
#define VARSIZE_SHORT(PTR)
Definition: postgres.h:305
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
#define Assert(condition)
Definition: c.h:732
union ReorderBufferChange::@101 data
#define VARATT_IS_EXTENDED(PTR)
Definition: postgres.h:327
#define DatumGetPointer(X)
Definition: postgres.h:549
#define elog(elevel,...)
Definition: elog.h:226
static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)

◆ ReorderBufferToastInitHash()

static void ReorderBufferToastInitHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2914 of file reorderbuffer.c.

References Assert, ReorderBuffer::context, HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, and ReorderBufferTXN::toast_hash.

Referenced by ReorderBufferToastAppendChunk().

2915 {
2916  HASHCTL hash_ctl;
2917 
2918  Assert(txn->toast_hash == NULL);
2919 
2920  memset(&hash_ctl, 0, sizeof(hash_ctl));
2921  hash_ctl.keysize = sizeof(Oid);
2922  hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
2923  hash_ctl.hcxt = rb->context;
2924  txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
2926 }
struct ReorderBufferToastEnt ReorderBufferToastEnt
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
Size entrysize
Definition: hsearch.h:73
unsigned int Oid
Definition: postgres_ext.h:31
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext context
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
Size keysize
Definition: hsearch.h:72
#define Assert(condition)
Definition: c.h:732

◆ ReorderBufferToastReplace()

static void ReorderBufferToastReplace ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 3008 of file reorderbuffer.c.

References Assert, ReorderBufferToastEnt::chunks, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, DatumGetPointer, dlist_container, dlist_foreach, elog, ERROR, fastgetattr, free, HASH_FIND, hash_search(), heap_deform_tuple(), heap_form_tuple(), INDIRECT_POINTER_SIZE, MaxHeapTupleSize, MemoryContextSwitchTo(), TupleDescData::natts, palloc0(), pfree(), varatt_indirect::pointer, PointerGetDatum, RelationData::rd_rel, ReorderBufferToastEnt::reconstructed, RelationClose(), RelationGetDescr, RelationIdGetRelation(), RelationIsValid, ReorderBufferTupleBufData, SET_VARSIZE, SET_VARSIZE_COMPRESSED, SET_VARTAG_EXTERNAL, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, ReorderBufferTupleBuf::tuple, TupleDescAttr, varatt_external::va_extsize, varatt_external::va_rawsize, varatt_external::va_valueid, VARATT_EXTERNAL_GET_POINTER, VARATT_EXTERNAL_IS_COMPRESSED, VARATT_IS_EXTERNAL, VARATT_IS_SHORT, VARDATA, VARDATA_EXTERNAL, VARHDRSZ, VARSIZE, and VARTAG_INDIRECT.

Referenced by ReorderBufferCommit().

3010 {
3011  TupleDesc desc;
3012  int natt;
3013  Datum *attrs;
3014  bool *isnull;
3015  bool *free;
3016  HeapTuple tmphtup;
3017  Relation toast_rel;
3018  TupleDesc toast_desc;
3019  MemoryContext oldcontext;
3020  ReorderBufferTupleBuf *newtup;
3021 
3022  /* no toast tuples changed */
3023  if (txn->toast_hash == NULL)
3024  return;
3025 
3026  oldcontext = MemoryContextSwitchTo(rb->context);
3027 
3028  /* we should only have toast tuples in an INSERT or UPDATE */
3029  Assert(change->data.tp.newtuple);
3030 
3031  desc = RelationGetDescr(relation);
3032 
3033  toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
3034  if (!RelationIsValid(toast_rel))
3035  elog(ERROR, "could not open relation with OID %u",
3036  relation->rd_rel->reltoastrelid);
3037 
3038  toast_desc = RelationGetDescr(toast_rel);
3039 
3040  /* should we allocate from stack instead? */
3041  attrs = palloc0(sizeof(Datum) * desc->natts);
3042  isnull = palloc0(sizeof(bool) * desc->natts);
3043  free = palloc0(sizeof(bool) * desc->natts);
3044 
3045  newtup = change->data.tp.newtuple;
3046 
3047  heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
3048 
3049  for (natt = 0; natt < desc->natts; natt++)
3050  {
3051  Form_pg_attribute attr = TupleDescAttr(desc, natt);
3052  ReorderBufferToastEnt *ent;
3053  struct varlena *varlena;
3054 
3055  /* va_rawsize is the size of the original datum -- including header */
3056  struct varatt_external toast_pointer;
3057  struct varatt_indirect redirect_pointer;
3058  struct varlena *new_datum = NULL;
3059  struct varlena *reconstructed;
3060  dlist_iter it;
3061  Size data_done = 0;
3062 
3063  /* system columns aren't toasted */
3064  if (attr->attnum < 0)
3065  continue;
3066 
3067  if (attr->attisdropped)
3068  continue;
3069 
3070  /* not a varlena datatype */
3071  if (attr->attlen != -1)
3072  continue;
3073 
3074  /* no data */
3075  if (isnull[natt])
3076  continue;
3077 
3078  /* ok, we know we have a toast datum */
3079  varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
3080 
3081  /* no need to do anything if the tuple isn't external */
3082  if (!VARATT_IS_EXTERNAL(varlena))
3083  continue;
3084 
3085  VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
3086 
3087  /*
3088  * Check whether the toast tuple changed, replace if so.
3089  */
3090  ent = (ReorderBufferToastEnt *)
3091  hash_search(txn->toast_hash,
3092  (void *) &toast_pointer.va_valueid,
3093  HASH_FIND,
3094  NULL);
3095  if (ent == NULL)
3096  continue;
3097 
3098  new_datum =
3099  (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
3100 
3101  free[natt] = true;
3102 
3103  reconstructed = palloc0(toast_pointer.va_rawsize);
3104 
3105  ent->reconstructed = reconstructed;
3106 
3107  /* stitch toast tuple back together from its parts */
3108  dlist_foreach(it, &ent->chunks)
3109  {
3110  bool isnull;
3111  ReorderBufferChange *cchange;
3112  ReorderBufferTupleBuf *ctup;
3113  Pointer chunk;
3114 
3115  cchange = dlist_container(ReorderBufferChange, node, it.cur);
3116  ctup = cchange->data.tp.newtuple;
3117  chunk = DatumGetPointer(
3118  fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
3119 
3120  Assert(!isnull);
3121  Assert(!VARATT_IS_EXTERNAL(chunk));
3122  Assert(!VARATT_IS_SHORT(chunk));
3123 
3124  memcpy(VARDATA(reconstructed) + data_done,
3125  VARDATA(chunk),
3126  VARSIZE(chunk) - VARHDRSZ);
3127  data_done += VARSIZE(chunk) - VARHDRSZ;
3128  }
3129  Assert(data_done == toast_pointer.va_extsize);
3130 
3131  /* make sure its marked as compressed or not */
3132  if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
3133  SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
3134  else
3135  SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
3136 
3137  memset(&redirect_pointer, 0, sizeof(redirect_pointer));
3138  redirect_pointer.pointer = reconstructed;
3139 
3141  memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
3142  sizeof(redirect_pointer));
3143 
3144  attrs[natt] = PointerGetDatum(new_datum);
3145  }
3146 
3147  /*
3148  * Build tuple in separate memory & copy tuple back into the tuplebuf
3149  * passed to the output plugin. We can't directly heap_fill_tuple() into
3150  * the tuplebuf because attrs[] will point back into the current content.
3151  */
3152  tmphtup = heap_form_tuple(desc, attrs, isnull);
3153  Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
3154  Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
3155 
3156  memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
3157  newtup->tuple.t_len = tmphtup->t_len;
3158 
3159  /*
3160  * free resources we won't further need, more persistent stuff will be
3161  * free'd in ReorderBufferToastReset().
3162  */
3163  RelationClose(toast_rel);
3164  pfree(tmphtup);
3165  for (natt = 0; natt < desc->natts; natt++)
3166  {
3167  if (free[natt])
3168  pfree(DatumGetPointer(attrs[natt]));
3169  }
3170  pfree(attrs);
3171  pfree(free);
3172  pfree(isnull);
3173 
3174  MemoryContextSwitchTo(oldcontext);
3175 }
#define VARDATA(PTR)
Definition: postgres.h:302
#define fastgetattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:712
#define RelationGetDescr(relation)
Definition: rel.h:445
struct ReorderBufferChange::@101::@102 tp
#define VARSIZE(PTR)
Definition: postgres.h:303
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
Definition: detoast.h:32
#define PointerGetDatum(X)
Definition: postgres.h:556
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define VARHDRSZ
Definition: c.h:555
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
Form_pg_class rd_rel
Definition: rel.h:83
#define VARDATA_EXTERNAL(PTR)
Definition: postgres.h:310
HeapTupleHeader t_data
Definition: htup.h:68
#define VARATT_IS_EXTERNAL(PTR)
Definition: postgres.h:313
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:36
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:1056
char * Pointer
Definition: c.h:335
#define ERROR
Definition: elog.h:43
#define VARATT_IS_SHORT(PTR)
Definition: postgres.h:326
#define RelationIsValid(relation)
Definition: rel.h:392
uint32 t_len
Definition: htup.h:64
#define MaxHeapTupleSize
Definition: htup_details.h:560
struct varlena * reconstructed
#define SET_VARTAG_EXTERNAL(PTR, tag)
Definition: postgres.h:333
HeapTupleData tuple
Definition: reorderbuffer.h:27
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:200
void RelationClose(Relation relation)
Definition: relcache.c:2089
#define INDIRECT_POINTER_SIZE
Definition: detoast.h:44
MemoryContext context
void * palloc0(Size size)
Definition: mcxt.c:980
uintptr_t Datum
Definition: postgres.h:367
dlist_node * cur
Definition: ilist.h:161
#define free(a)
Definition: header.h:65
#define Assert(condition)
Definition: c.h:732
union ReorderBufferChange::@101 data
size_t Size
Definition: c.h:466
#define DatumGetPointer(X)
Definition: postgres.h:549
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:1249
#define elog(elevel,...)
Definition: elog.h:226
#define SET_VARSIZE_COMPRESSED(PTR, len)
Definition: postgres.h:331
#define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)
Definition: detoast.h:22
Definition: c.h:549
#define SET_VARSIZE(PTR, len)
Definition: postgres.h:329
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:1990

◆ ReorderBufferToastReset()

static void ReorderBufferToastReset ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 3181 of file reorderbuffer.c.

References ReorderBufferToastEnt::chunks, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, hash_destroy(), hash_seq_init(), hash_seq_search(), ReorderBufferChange::node, pfree(), ReorderBufferToastEnt::reconstructed, ReorderBufferReturnChange(), and ReorderBufferTXN::toast_hash.

Referenced by ReorderBufferCommit().

3182 {
3183  HASH_SEQ_STATUS hstat;
3184  ReorderBufferToastEnt *ent;
3185 
3186  if (txn->toast_hash == NULL)
3187  return;
3188 
3189  /* sequentially walk over the hash and free everything */
3190  hash_seq_init(&hstat, txn->toast_hash);
3191  while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
3192  {
3193  dlist_mutable_iter it;
3194 
3195  if (ent->reconstructed != NULL)
3196  pfree(ent->reconstructed);
3197 
3198  dlist_foreach_modify(it, &ent->chunks)
3199  {
3200  ReorderBufferChange *change =
3202 
3203  dlist_delete(&change->node);
3204  ReorderBufferReturnChange(rb, change);
3205  }
3206  }
3207 
3208  hash_destroy(txn->toast_hash);
3209  txn->toast_hash = NULL;
3210 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:814
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:1056
struct varlena * reconstructed
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1389
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1379

◆ ReorderBufferTransferSnapToParent()

static void ReorderBufferTransferSnapToParent ( ReorderBufferTXN txn,
ReorderBufferTXN subtxn 
)
static

Definition at line 836 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBufferTXN::base_snapshot_node, dlist_delete(), dlist_insert_before(), InvalidXLogRecPtr, SnapBuildSnapDecRefcount(), ReorderBufferTXN::toplevel_xid, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAssignChild().

838 {
839  Assert(subtxn->toplevel_xid == txn->xid);
840 
841  if (subtxn->base_snapshot != NULL)
842  {
843  if (txn->base_snapshot == NULL ||
844  subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
845  {
846  /*
847  * If the toplevel transaction already has a base snapshot but
848  * it's newer than the subxact's, purge it.
849  */
850  if (txn->base_snapshot != NULL)
851  {
854  }
855 
856  /*
857  * The snapshot is now the top transaction's; transfer it, and
858  * adjust the list position of the top transaction in the list by
859  * moving it to where the subtransaction is.
860  */
861  txn->base_snapshot = subtxn->base_snapshot;
862  txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
864  &txn->base_snapshot_node);
865 
866  /*
867  * The subtransaction doesn't have a snapshot anymore (so it
868  * mustn't be in the list.)
869  */
870  subtxn->base_snapshot = NULL;
873  }
874  else
875  {
876  /* Base snap of toplevel is fine, so subxact's is not needed */
879  subtxn->base_snapshot = NULL;
881  }
882  }
883 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
dlist_node base_snapshot_node
XLogRecPtr base_snapshot_lsn
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void dlist_insert_before(dlist_node *before, dlist_node *node)
Definition: ilist.h:346
TransactionId xid
#define Assert(condition)
Definition: c.h:732
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:433
TransactionId toplevel_xid

◆ ReorderBufferTXNByXid()

static ReorderBufferTXN * ReorderBufferTXNByXid ( ReorderBuffer rb,
TransactionId  xid,
bool  create,
bool is_new,
XLogRecPtr  lsn,
bool  create_as_top 
)
static

Definition at line 497 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::current_restart_decoding_lsn, dlist_push_tail(), ReorderBufferTXN::first_lsn, HASH_ENTER, HASH_FIND, hash_search(), InvalidXLogRecPtr, ReorderBufferTXN::node, ReorderBufferGetTXN(), ReorderBufferTXN::restart_decoding_lsn, ReorderBuffer::toplevel_by_lsn, TransactionIdIsValid, ReorderBufferTXNByIdEnt::txn, ReorderBufferTXNByIdEnt::xid, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAddInvalidations(), ReorderBufferAddNewTupleCids(), ReorderBufferAssignChild(), ReorderBufferCommit(), ReorderBufferCommitChild(), ReorderBufferForget(), ReorderBufferProcessXid(), ReorderBufferQueueChange(), ReorderBufferQueueMessage(), ReorderBufferSetBaseSnapshot(), ReorderBufferXidHasBaseSnapshot(), ReorderBufferXidHasCatalogChanges(), and ReorderBufferXidSetCatalogChanges().

499 {
500  ReorderBufferTXN *txn;
502  bool found;
503 
505 
506  /*
507  * Check the one-entry lookup cache first
508  */
510  rb->by_txn_last_xid == xid)
511  {
512  txn = rb->by_txn_last_txn;
513 
514  if (txn != NULL)
515  {
516  /* found it, and it's valid */
517  if (is_new)
518  *is_new = false;
519  return txn;
520  }
521 
522  /*
523  * cached as non-existent, and asked not to create? Then nothing else
524  * to do.
525  */
526  if (!create)
527  return NULL;
528  /* otherwise fall through to create it */
529  }
530 
531  /*
532  * If the cache wasn't hit or it yielded an "does-not-exist" and we want
533  * to create an entry.
534  */
535 
536  /* search the lookup table */
537  ent = (ReorderBufferTXNByIdEnt *)
538  hash_search(rb->by_txn,
539  (void *) &xid,
540  create ? HASH_ENTER : HASH_FIND,
541  &found);
542  if (found)
543  txn = ent->txn;
544  else if (create)
545  {
546  /* initialize the new entry, if creation was requested */
547  Assert(ent != NULL);
548  Assert(lsn != InvalidXLogRecPtr);
549 
550  ent->txn = ReorderBufferGetTXN(rb);
551  ent->txn->xid = xid;
552  txn = ent->txn;
553  txn->first_lsn = lsn;
555 
556  if (create_as_top)
557  {
558  dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
559  AssertTXNLsnOrder(rb);
560  }
561  }
562  else
563  txn = NULL; /* not found and not asked to create */
564 
565  /* update cache */
566  rb->by_txn_last_xid = xid;
567  rb->by_txn_last_txn = txn;
568 
569  if (is_new)
570  *is_new = !found;
571 
572  Assert(!create || txn != NULL);
573  return txn;
574 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
static ReorderBufferTXN * ReorderBufferGetTXN(ReorderBuffer *rb)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c: