PostgreSQL Source Code  git master
reorderbuffer.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/tuptoaster.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/combocid.h"
#include "utils/memdebug.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenodemap.h"
#include "utils/tqual.h"
Include dependency graph for reorderbuffer.c:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTXNByIdEnt
 
struct  ReorderBufferTupleCidKey
 
struct  ReorderBufferTupleCidEnt
 
struct  ReorderBufferIterTXNEntry
 
struct  ReorderBufferIterTXNState
 
struct  ReorderBufferToastEnt
 
struct  ReorderBufferDiskChange
 
struct  RewriteMappingFile
 

Typedefs

typedef struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
 
typedef struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
 
typedef struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
 
typedef struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
 
typedef struct ReorderBufferIterTXNState ReorderBufferIterTXNState
 
typedef struct ReorderBufferToastEnt ReorderBufferToastEnt
 
typedef struct ReorderBufferDiskChange ReorderBufferDiskChange
 
typedef struct RewriteMappingFile RewriteMappingFile
 

Functions

static ReorderBufferTXNReorderBufferGetTXN (ReorderBuffer *rb)
 
static void ReorderBufferReturnTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferTXNReorderBufferTXNByXid (ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
 
static void AssertTXNLsnOrder (ReorderBuffer *rb)
 
static ReorderBufferIterTXNStateReorderBufferIterTXNInit (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferChangeReorderBufferIterTXNNext (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferIterTXNFinish (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferExecuteInvalidations (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferCheckSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeChange (ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
 
static Size ReorderBufferRestoreChanges (ReorderBuffer *rb, ReorderBufferTXN *txn, int *fd, XLogSegNo *segno)
 
static void ReorderBufferRestoreChange (ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
 
static void ReorderBufferRestoreCleanup (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferFreeSnap (ReorderBuffer *rb, Snapshot snap)
 
static Snapshot ReorderBufferCopySnap (ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
 
static void ReorderBufferToastInitHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReset (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReplace (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void ReorderBufferToastAppendChunk (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *rb)
 
void ReorderBufferReturnChange (ReorderBuffer *rb, ReorderBufferChange *change)
 
ReorderBufferTupleBufReorderBufferGetTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
static int ReorderBufferIterCompare (Datum a, Datum b, void *arg)
 
static void ReorderBufferCleanupTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferBuildTupleCidHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
static void ReorderBufferSerializeReserve (ReorderBuffer *rb, Size sz)
 
void StartupReorderBuffer (void)
 
static void ApplyLogicalMappingFile (HTAB *tuplecid_data, Oid relid, const char *fname)
 
static bool TransactionIdInArray (TransactionId xid, TransactionId *xip, Size num)
 
static int file_sort_by_lsn (const void *a_p, const void *b_p)
 
static void UpdateLogicalMappings (HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
 
bool ResolveCminCmaxDuringDecoding (HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
 

Variables

static const Size max_changes_in_memory = 4096
 

Typedef Documentation

◆ ReorderBufferDiskChange

◆ ReorderBufferIterTXNEntry

◆ ReorderBufferIterTXNState

◆ ReorderBufferToastEnt

◆ ReorderBufferTupleCidEnt

◆ ReorderBufferTupleCidKey

◆ ReorderBufferTXNByIdEnt

◆ RewriteMappingFile

Function Documentation

◆ ApplyLogicalMappingFile()

static void ApplyLogicalMappingFile ( HTAB tuplecid_data,
Oid  relid,
const char *  fname 
)
static

Definition at line 3004 of file reorderbuffer.c.

References Assert, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), HASH_ENTER, HASH_FIND, hash_search(), InvalidCommandId, ItemPointerCopy, MAXPGPATH, LogicalRewriteMappingData::new_node, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_node, LogicalRewriteMappingData::old_tid, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, ReorderBufferTupleCidKey::relnode, ReorderBufferTupleCidKey::tid, and WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ.

Referenced by UpdateLogicalMappings().

3005 {
3006  char path[MAXPGPATH];
3007  int fd;
3008  int readBytes;
3010 
3011  sprintf(path, "pg_logical/mappings/%s", fname);
3012  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
3013  if (fd < 0)
3014  ereport(ERROR,
3016  errmsg("could not open file \"%s\": %m", path)));
3017 
3018  while (true)
3019  {
3022  ReorderBufferTupleCidEnt *new_ent;
3023  bool found;
3024 
3025  /* be careful about padding */
3026  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
3027 
3028  /* read all mappings till the end of the file */
3030  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
3032 
3033  if (readBytes < 0)
3034  ereport(ERROR,
3036  errmsg("could not read file \"%s\": %m",
3037  path)));
3038  else if (readBytes == 0) /* EOF */
3039  break;
3040  else if (readBytes != sizeof(LogicalRewriteMappingData))
3041  ereport(ERROR,
3043  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
3044  path, readBytes,
3045  (int32) sizeof(LogicalRewriteMappingData))));
3046 
3047  key.relnode = map.old_node;
3048  ItemPointerCopy(&map.old_tid,
3049  &key.tid);
3050 
3051 
3052  ent = (ReorderBufferTupleCidEnt *)
3053  hash_search(tuplecid_data,
3054  (void *) &key,
3055  HASH_FIND,
3056  NULL);
3057 
3058  /* no existing mapping, no need to update */
3059  if (!ent)
3060  continue;
3061 
3062  key.relnode = map.new_node;
3063  ItemPointerCopy(&map.new_tid,
3064  &key.tid);
3065 
3066  new_ent = (ReorderBufferTupleCidEnt *)
3067  hash_search(tuplecid_data,
3068  (void *) &key,
3069  HASH_ENTER,
3070  &found);
3071 
3072  if (found)
3073  {
3074  /*
3075  * Make sure the existing mapping makes sense. We sometime update
3076  * old records that did not yet have a cmax (e.g. pg_class' own
3077  * entry while rewriting it) during rewrites, so allow that.
3078  */
3079  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
3080  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
3081  }
3082  else
3083  {
3084  /* update mapping */
3085  new_ent->cmin = ent->cmin;
3086  new_ent->cmax = ent->cmax;
3087  new_ent->combocid = ent->combocid;
3088  }
3089  }
3090 }
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:904
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1049
signed int int32
Definition: c.h:294
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2393
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:598
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1259
#define ereport(elevel, rest)
Definition: elog.h:122
#define InvalidCommandId
Definition: c.h:472
ItemPointerData new_tid
Definition: rewriteheap.h:40
#define Assert(condition)
Definition: c.h:680
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1235
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define read(a, b, c)
Definition: win32.h:13
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:139
ItemPointerData old_tid
Definition: rewriteheap.h:39

◆ AssertTXNLsnOrder()

static void AssertTXNLsnOrder ( ReorderBuffer rb)
static

Definition at line 607 of file reorderbuffer.c.

References Assert, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, and ReorderBuffer::toplevel_by_lsn.

Referenced by ReorderBufferGetOldestTXN(), and ReorderBufferTXNByXid().

608 {
609 #ifdef USE_ASSERT_CHECKING
610  dlist_iter iter;
611  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
612 
613  dlist_foreach(iter, &rb->toplevel_by_lsn)
614  {
615  ReorderBufferTXN *cur_txn;
616 
617  cur_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
618  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
619 
620  if (cur_txn->end_lsn != InvalidXLogRecPtr)
621  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
622 
623  if (prev_first_lsn != InvalidXLogRecPtr)
624  Assert(prev_first_lsn < cur_txn->first_lsn);
625 
626  Assert(!cur_txn->is_known_as_subxact);
627  prev_first_lsn = cur_txn->first_lsn;
628  }
629 #endif
630 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head toplevel_by_lsn
dlist_node * cur
Definition: ilist.h:161
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:680
XLogRecPtr end_lsn

◆ file_sort_by_lsn()

static int file_sort_by_lsn ( const void *  a_p,
const void *  b_p 
)
static

Definition at line 3107 of file reorderbuffer.c.

References RewriteMappingFile::lsn.

Referenced by UpdateLogicalMappings().

3108 {
3109  RewriteMappingFile *a = *(RewriteMappingFile **) a_p;
3110  RewriteMappingFile *b = *(RewriteMappingFile **) b_p;
3111 
3112  if (a->lsn < b->lsn)
3113  return -1;
3114  else if (a->lsn > b->lsn)
3115  return 1;
3116  return 0;
3117 }

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1639 of file reorderbuffer.c.

References ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferCleanupTXN(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeAbort().

1640 {
1641  ReorderBufferTXN *txn;
1642 
1643  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1644  false);
1645 
1646  /* unknown, nothing to remove */
1647  if (txn == NULL)
1648  return;
1649 
1650  /* cosmetic... */
1651  txn->final_lsn = lsn;
1652 
1653  /* remove potential on-disk data, and deallocate */
1654  ReorderBufferCleanupTXN(rb, txn);
1655 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)

Definition at line 1665 of file reorderbuffer.c.

References ReorderBufferTXN::changes, dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, dlist_tail_element, elog, ReorderBufferTXN::final_lsn, ReorderBufferChange::lsn, ReorderBufferCleanupTXN(), ReorderBufferTXN::serialized, ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::xid.

Referenced by DecodeStandbyOp().

1666 {
1667  dlist_mutable_iter it;
1668 
1669  /*
1670  * Iterate through all (potential) toplevel TXNs and abort all that are
1671  * older than what possibly can be running. Once we've found the first
1672  * that is alive we stop, there might be some that acquired an xid earlier
1673  * but started writing later, but it's unlikely and they will be cleaned
1674  * up in a later call to this function.
1675  */
1677  {
1678  ReorderBufferTXN *txn;
1679 
1680  txn = dlist_container(ReorderBufferTXN, node, it.cur);
1681 
1682  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1683  {
1684  /*
1685  * We set final_lsn on a transaction when we decode its commit or
1686  * abort record, but we never see those records for crashed
1687  * transactions. To ensure cleanup of these transactions, set
1688  * final_lsn to that of their last change; this causes
1689  * ReorderBufferRestoreCleanup to do the right thing.
1690  */
1691  if (txn->serialized && txn->final_lsn == 0)
1692  {
1693  ReorderBufferChange *last =
1695 
1696  txn->final_lsn = last->lsn;
1697  }
1698 
1699  elog(DEBUG2, "aborting old transaction %u", txn->xid);
1700 
1701  /* remove potential on-disk data, and deallocate this tx */
1702  ReorderBufferCleanupTXN(rb, txn);
1703  }
1704  else
1705  return;
1706  }
1707 }
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:496
dlist_head changes
#define DEBUG2
Definition: elog.h:24
XLogRecPtr final_lsn
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
dlist_head toplevel_by_lsn
TransactionId xid
#define elog
Definition: elog.h:219

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 1892 of file reorderbuffer.c.

References Assert, ReorderBuffer::context, elog, ERROR, ReorderBufferTXN::invalidations, MemoryContextAlloc(), ReorderBufferTXN::ninvalidations, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

1895 {
1896  ReorderBufferTXN *txn;
1897 
1898  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1899 
1900  if (txn->ninvalidations != 0)
1901  elog(ERROR, "only ever add one set of invalidations");
1902 
1903  Assert(nmsgs > 0);
1904 
1905  txn->ninvalidations = nmsgs;
1908  sizeof(SharedInvalidationMessage) * nmsgs);
1909  memcpy(txn->invalidations, msgs,
1910  sizeof(SharedInvalidationMessage) * nmsgs);
1911 }
#define ERROR
Definition: elog.h:43
MemoryContext context
#define Assert(condition)
Definition: c.h:680
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:693
#define elog
Definition: elog.h:219

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

Definition at line 1848 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::command_id, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, ReorderBufferGetChange(), and ReorderBufferQueueChange().

Referenced by SnapBuildProcessNewCid().

1850 {
1852 
1853  change->data.command_id = cid;
1855 
1856  ReorderBufferQueueChange(rb, xid, lsn, change);
1857 }
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
union ReorderBufferChange::@102 data
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileNode  node,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 1864 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessNewCid().

1868 {
1870  ReorderBufferTXN *txn;
1871 
1872  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1873 
1874  change->data.tuplecid.node = node;
1875  change->data.tuplecid.tid = tid;
1876  change->data.tuplecid.cmin = cmin;
1877  change->data.tuplecid.cmax = cmax;
1878  change->data.tuplecid.combocid = combocid;
1879  change->lsn = lsn;
1881 
1882  dlist_push_tail(&txn->tuplecids, &change->node);
1883  txn->ntuplecids++;
1884 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
union ReorderBufferChange::@102 data
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
struct ReorderBufferChange::@102::@105 tuplecid
dlist_head tuplecids

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 1808 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, ReorderBufferGetChange(), ReorderBufferQueueChange(), and ReorderBufferChange::snapshot.

Referenced by SnapBuildDistributeNewCatalogSnapshot().

1810 {
1812 
1813  change->data.snapshot = snap;
1815 
1816  ReorderBufferQueueChange(rb, xid, lsn, change);
1817 }
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
union ReorderBufferChange::@102 data
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)

◆ ReorderBufferAllocate()

ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 220 of file reorderbuffer.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, buffer, ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), ReorderBuffer::outbuf, ReorderBuffer::outbufsize, SLAB_DEFAULT_BLOCK_SIZE, SLAB_LARGE_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::toplevel_by_lsn, ReorderBuffer::tup_context, and ReorderBuffer::txn_context.

Referenced by StartupDecodingContext().

221 {
223  HASHCTL hash_ctl;
224  MemoryContext new_ctx;
225 
226  /* allocate memory in own context, to have better accountability */
228  "ReorderBuffer",
230 
231  buffer =
232  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
233 
234  memset(&hash_ctl, 0, sizeof(hash_ctl));
235 
236  buffer->context = new_ctx;
237 
238  buffer->change_context = SlabContextCreate(new_ctx,
239  "Change",
240  0,
242  sizeof(ReorderBufferChange));
243 
244  buffer->txn_context = SlabContextCreate(new_ctx,
245  "TXN",
246  0,
248  sizeof(ReorderBufferTXN));
249 
250  buffer->tup_context = GenerationContextCreate(new_ctx,
251  "Tuples",
252  0,
254 
255  hash_ctl.keysize = sizeof(TransactionId);
256  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
257  hash_ctl.hcxt = buffer->context;
258 
259  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
261 
263  buffer->by_txn_last_txn = NULL;
264 
265  buffer->outbuf = NULL;
266  buffer->outbufsize = 0;
267 
269 
270  dlist_init(&buffer->toplevel_by_lsn);
271 
272  return buffer;
273 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
uint32 TransactionId
Definition: c.h:455
MemoryContext hcxt
Definition: hsearch.h:78
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
Size entrysize
Definition: hsearch.h:73
MemoryContext change_context
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, int flags, Size blockSize, Size chunkSize)
Definition: slab.c:190
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, int flags, Size blockSize)
Definition: generation.c:208
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:227
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:197
#define InvalidTransactionId
Definition: transam.h:31
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
#define AllocSetContextCreate(parent, name, allocparams)
Definition: memutils.h:165
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext context
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
ReorderBufferTXN * by_txn_last_txn
dlist_head toplevel_by_lsn
Size keysize
Definition: hsearch.h:72
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:215
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:226
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:693
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
MemoryContext tup_context
MemoryContext txn_context

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 656 of file reorderbuffer.c.

References Assert, dlist_delete(), dlist_push_tail(), elog, ERROR, ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeXactOp().

658 {
659  ReorderBufferTXN *txn;
660  ReorderBufferTXN *subtxn;
661  bool new_top;
662  bool new_sub;
663 
664  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
665  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
666 
667  if (new_sub)
668  {
669  /*
670  * we assign subtransactions to top level transaction even if we don't
671  * have data for it yet, assignment records frequently reference xids
672  * that have not yet produced any records. Knowing those aren't top
673  * level xids allows us to make processing cheaper in some places.
674  */
675  dlist_push_tail(&txn->subtxns, &subtxn->node);
676  txn->nsubtxns++;
677  }
678  else if (!subtxn->is_known_as_subxact)
679  {
680  subtxn->is_known_as_subxact = true;
681  Assert(subtxn->nsubtxns == 0);
682 
683  /* remove from lsn order list of top-level transactions */
684  dlist_delete(&subtxn->node);
685 
686  /* add to toplevel transaction */
687  dlist_push_tail(&txn->subtxns, &subtxn->node);
688  txn->nsubtxns++;
689  }
690  else if (new_top)
691  {
692  elog(ERROR, "existing subxact assigned to unknown toplevel xact");
693  }
694 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
#define ERROR
Definition: elog.h:43
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define Assert(condition)
Definition: c.h:680
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define elog
Definition: elog.h:219

◆ ReorderBufferBuildTupleCidHash()

static void ReorderBufferBuildTupleCidHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1113 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, dlist_container, dlist_foreach, dlist_is_empty(), HASHCTL::entrysize, ReorderBufferTXN::has_catalog_changes, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FIND, hash_search(), HASHCTL::hcxt, InvalidCommandId, ItemPointerCopy, HASHCTL::keysize, ReorderBufferTXN::ntuplecids, ReorderBufferTupleCidKey::relnode, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferTupleCidKey::tid, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::tuplecids.

Referenced by ReorderBufferCommit().

1114 {
1115  dlist_iter iter;
1116  HASHCTL hash_ctl;
1117 
1118  if (!txn->has_catalog_changes || dlist_is_empty(&txn->tuplecids))
1119  return;
1120 
1121  memset(&hash_ctl, 0, sizeof(hash_ctl));
1122 
1123  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1124  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1125  hash_ctl.hcxt = rb->context;
1126 
1127  /*
1128  * create the hash with the exact number of to-be-stored tuplecids from
1129  * the start
1130  */
1131  txn->tuplecid_hash =
1132  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1134 
1135  dlist_foreach(iter, &txn->tuplecids)
1136  {
1139  bool found;
1140  ReorderBufferChange *change;
1141 
1142  change = dlist_container(ReorderBufferChange, node, iter.cur);
1143 
1145 
1146  /* be careful about padding */
1147  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1148 
1149  key.relnode = change->data.tuplecid.node;
1150 
1151  ItemPointerCopy(&change->data.tuplecid.tid,
1152  &key.tid);
1153 
1154  ent = (ReorderBufferTupleCidEnt *)
1156  (void *) &key,
1158  &found);
1159  if (!found)
1160  {
1161  ent->cmin = change->data.tuplecid.cmin;
1162  ent->cmax = change->data.tuplecid.cmax;
1163  ent->combocid = change->data.tuplecid.combocid;
1164  }
1165  else
1166  {
1167  Assert(ent->cmin == change->data.tuplecid.cmin);
1168  Assert(ent->cmax == InvalidCommandId ||
1169  ent->cmax == change->data.tuplecid.cmax);
1170 
1171  /*
1172  * if the tuple got valid in this transaction and now got deleted
1173  * we already have a valid cmin stored. The cmax will be
1174  * InvalidCommandId though.
1175  */
1176  ent->cmax = change->data.tuplecid.cmax;
1177  }
1178  }
1179 }
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
Size entrysize
Definition: hsearch.h:73
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:904
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext context
#define InvalidCommandId
Definition: c.h:472
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
union ReorderBufferChange::@102 data
Size keysize
Definition: hsearch.h:72
dlist_node * cur
Definition: ilist.h:161
#define Assert(condition)
Definition: c.h:680
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
struct ReorderBufferChange::@102::@105 tuplecid
dlist_head tuplecids
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:139

◆ ReorderBufferCheckSerializeTXN()

static void ReorderBufferCheckSerializeTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2009 of file reorderbuffer.c.

References Assert, max_changes_in_memory, ReorderBufferTXN::nentries_mem, and ReorderBufferSerializeTXN().

Referenced by ReorderBufferQueueChange().

2010 {
2011  /*
2012  * TODO: improve accounting so we cheaply can take subtransactions into
2013  * account here.
2014  */
2015  if (txn->nentries_mem >= max_changes_in_memory)
2016  {
2017  ReorderBufferSerializeTXN(rb, txn);
2018  Assert(txn->nentries_mem == 0);
2019  }
2020 }
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:680
static const Size max_changes_in_memory

◆ ReorderBufferCleanupTXN()

static void ReorderBufferCleanupTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1030 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBuffer::by_txn, ReorderBufferTXN::changes, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, HASH_REMOVE, hash_search(), InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferRestoreCleanup(), ReorderBufferReturnChange(), ReorderBufferReturnTXN(), ReorderBufferTXN::serialized, SnapBuildSnapDecRefcount(), ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAbortOld(), ReorderBufferCommit(), and ReorderBufferForget().

1031 {
1032  bool found;
1033  dlist_mutable_iter iter;
1034 
1035  /* cleanup subtransactions & their changes */
1036  dlist_foreach_modify(iter, &txn->subtxns)
1037  {
1038  ReorderBufferTXN *subtxn;
1039 
1040  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1041 
1042  /*
1043  * Subtransactions are always associated to the toplevel TXN, even if
1044  * they originally were happening inside another subtxn, so we won't
1045  * ever recurse more than one level deep here.
1046  */
1047  Assert(subtxn->is_known_as_subxact);
1048  Assert(subtxn->nsubtxns == 0);
1049 
1050  ReorderBufferCleanupTXN(rb, subtxn);
1051  }
1052 
1053  /* cleanup changes in the toplevel txn */
1054  dlist_foreach_modify(iter, &txn->changes)
1055  {
1056  ReorderBufferChange *change;
1057 
1058  change = dlist_container(ReorderBufferChange, node, iter.cur);
1059 
1060  ReorderBufferReturnChange(rb, change);
1061  }
1062 
1063  /*
1064  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1065  * They are always stored in the toplevel transaction.
1066  */
1067  dlist_foreach_modify(iter, &txn->tuplecids)
1068  {
1069  ReorderBufferChange *change;
1070 
1071  change = dlist_container(ReorderBufferChange, node, iter.cur);
1073  ReorderBufferReturnChange(rb, change);
1074  }
1075 
1076  if (txn->base_snapshot != NULL)
1077  {
1079  txn->base_snapshot = NULL;
1081  }
1082 
1083  /*
1084  * Remove TXN from its containing list.
1085  *
1086  * Note: if txn->is_known_as_subxact, we are deleting the TXN from its
1087  * parent's list of known subxacts; this leaves the parent's nsubxacts
1088  * count too high, but we don't care. Otherwise, we are deleting the TXN
1089  * from the LSN-ordered list of toplevel TXNs.
1090  */
1091  dlist_delete(&txn->node);
1092 
1093  /* now remove reference from buffer */
1094  hash_search(rb->by_txn,
1095  (void *) &txn->xid,
1096  HASH_REMOVE,
1097  &found);
1098  Assert(found);
1099 
1100  /* remove entries spilled to disk */
1101  if (txn->serialized)
1102  ReorderBufferRestoreCleanup(rb, txn);
1103 
1104  /* deallocate */
1105  ReorderBufferReturnTXN(rb, txn);
1106 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:904
XLogRecPtr base_snapshot_lsn
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
#define Assert(condition)
Definition: c.h:680
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:434
dlist_head subtxns
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
dlist_head tuplecids

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 1268 of file reorderbuffer.c.

References AbortCurrentTransaction(), ReorderBufferChange::action, ReorderBuffer::apply_change, Assert, ReorderBufferTXN::base_snapshot, ReorderBuffer::begin, BeginInternalSubTransaction(), ReorderBufferChange::command_id, ReorderBuffer::commit, ReorderBufferTXN::commit_time, SnapshotData::copied, SnapshotData::curcid, ReorderBufferChange::data, dlist_delete(), elog, ReorderBufferTXN::end_lsn, ERROR, ReorderBufferTXN::final_lsn, FirstCommandId, GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, ReorderBuffer::message, ReorderBufferChange::msg, ReorderBufferTXN::ninvalidations, ReorderBufferChange::node, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelidByRelfilenode(), RELKIND_SEQUENCE, relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferReturnChange(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTXNByXid(), RollbackAndReleaseCurrentSubTransaction(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, StartTransactionCommand(), TeardownHistoricSnapshot(), ReorderBufferChange::tp, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

1272 {
1273  ReorderBufferTXN *txn;
1274  volatile Snapshot snapshot_now;
1275  volatile CommandId command_id = FirstCommandId;
1276  bool using_subtxn;
1277  ReorderBufferIterTXNState *volatile iterstate = NULL;
1278 
1279  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1280  false);
1281 
1282  /* unknown transaction, nothing to replay */
1283  if (txn == NULL)
1284  return;
1285 
1286  txn->final_lsn = commit_lsn;
1287  txn->end_lsn = end_lsn;
1288  txn->commit_time = commit_time;
1289  txn->origin_id = origin_id;
1290  txn->origin_lsn = origin_lsn;
1291 
1292  /*
1293  * If this transaction didn't have any real changes in our database, it's
1294  * OK not to have a snapshot. Note that ReorderBufferCommitChild will have
1295  * transferred its snapshot to this transaction if it had one and the
1296  * toplevel tx didn't.
1297  */
1298  if (txn->base_snapshot == NULL)
1299  {
1300  Assert(txn->ninvalidations == 0);
1301  ReorderBufferCleanupTXN(rb, txn);
1302  return;
1303  }
1304 
1305  snapshot_now = txn->base_snapshot;
1306 
1307  /* build data to be able to lookup the CommandIds of catalog tuples */
1309 
1310  /* setup the initial snapshot */
1311  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1312 
1313  /*
1314  * Decoding needs access to syscaches et al., which in turn use
1315  * heavyweight locks and such. Thus we need to have enough state around to
1316  * keep track of those. The easiest way is to simply use a transaction
1317  * internally. That also allows us to easily enforce that nothing writes
1318  * to the database by checking for xid assignments.
1319  *
1320  * When we're called via the SQL SRF there's already a transaction
1321  * started, so start an explicit subtransaction there.
1322  */
1323  using_subtxn = IsTransactionOrTransactionBlock();
1324 
1325  PG_TRY();
1326  {
1327  ReorderBufferChange *change;
1328  ReorderBufferChange *specinsert = NULL;
1329 
1330  if (using_subtxn)
1331  BeginInternalSubTransaction("replay");
1332  else
1334 
1335  rb->begin(rb, txn);
1336 
1337  iterstate = ReorderBufferIterTXNInit(rb, txn);
1338  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1339  {
1340  Relation relation = NULL;
1341  Oid reloid;
1342 
1343  switch (change->action)
1344  {
1346 
1347  /*
1348  * Confirmation for speculative insertion arrived. Simply
1349  * use as a normal record. It'll be cleaned up at the end
1350  * of INSERT processing.
1351  */
1352  Assert(specinsert->data.tp.oldtuple == NULL);
1353  change = specinsert;
1355 
1356  /* intentionally fall through */
1360  Assert(snapshot_now);
1361 
1362  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1363  change->data.tp.relnode.relNode);
1364 
1365  /*
1366  * Catalog tuple without data, emitted while catalog was
1367  * in the process of being rewritten.
1368  */
1369  if (reloid == InvalidOid &&
1370  change->data.tp.newtuple == NULL &&
1371  change->data.tp.oldtuple == NULL)
1372  goto change_done;
1373  else if (reloid == InvalidOid)
1374  elog(ERROR, "could not map filenode \"%s\" to relation OID",
1375  relpathperm(change->data.tp.relnode,
1376  MAIN_FORKNUM));
1377 
1378  relation = RelationIdGetRelation(reloid);
1379 
1380  if (relation == NULL)
1381  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1382  reloid,
1383  relpathperm(change->data.tp.relnode,
1384  MAIN_FORKNUM));
1385 
1386  if (!RelationIsLogicallyLogged(relation))
1387  goto change_done;
1388 
1389  /*
1390  * For now ignore sequence changes entirely. Most of the
1391  * time they don't log changes using records we
1392  * understand, so it doesn't make sense to handle the few
1393  * cases we do.
1394  */
1395  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1396  goto change_done;
1397 
1398  /* user-triggered change */
1399  if (!IsToastRelation(relation))
1400  {
1401  ReorderBufferToastReplace(rb, txn, relation, change);
1402  rb->apply_change(rb, txn, relation, change);
1403 
1404  /*
1405  * Only clear reassembled toast chunks if we're sure
1406  * they're not required anymore. The creator of the
1407  * tuple tells us.
1408  */
1409  if (change->data.tp.clear_toast_afterwards)
1410  ReorderBufferToastReset(rb, txn);
1411  }
1412  /* we're not interested in toast deletions */
1413  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1414  {
1415  /*
1416  * Need to reassemble the full toasted Datum in
1417  * memory, to ensure the chunks don't get reused till
1418  * we're done remove it from the list of this
1419  * transaction's changes. Otherwise it will get
1420  * freed/reused while restoring spooled data from
1421  * disk.
1422  */
1423  dlist_delete(&change->node);
1424  ReorderBufferToastAppendChunk(rb, txn, relation,
1425  change);
1426  }
1427 
1428  change_done:
1429 
1430  /*
1431  * Either speculative insertion was confirmed, or it was
1432  * unsuccessful and the record isn't needed anymore.
1433  */
1434  if (specinsert != NULL)
1435  {
1436  ReorderBufferReturnChange(rb, specinsert);
1437  specinsert = NULL;
1438  }
1439 
1440  if (relation != NULL)
1441  {
1442  RelationClose(relation);
1443  relation = NULL;
1444  }
1445  break;
1446 
1448 
1449  /*
1450  * Speculative insertions are dealt with by delaying the
1451  * processing of the insert until the confirmation record
1452  * arrives. For that we simply unlink the record from the
1453  * chain, so it does not get freed/reused while restoring
1454  * spooled data from disk.
1455  *
1456  * This is safe in the face of concurrent catalog changes
1457  * because the relevant relation can't be changed between
1458  * speculative insertion and confirmation due to
1459  * CheckTableNotInUse() and locking.
1460  */
1461 
1462  /* clear out a pending (and thus failed) speculation */
1463  if (specinsert != NULL)
1464  {
1465  ReorderBufferReturnChange(rb, specinsert);
1466  specinsert = NULL;
1467  }
1468 
1469  /* and memorize the pending insertion */
1470  dlist_delete(&change->node);
1471  specinsert = change;
1472  break;
1473 
1475  rb->message(rb, txn, change->lsn, true,
1476  change->data.msg.prefix,
1477  change->data.msg.message_size,
1478  change->data.msg.message);
1479  break;
1480 
1482  /* get rid of the old */
1483  TeardownHistoricSnapshot(false);
1484 
1485  if (snapshot_now->copied)
1486  {
1487  ReorderBufferFreeSnap(rb, snapshot_now);
1488  snapshot_now =
1489  ReorderBufferCopySnap(rb, change->data.snapshot,
1490  txn, command_id);
1491  }
1492 
1493  /*
1494  * Restored from disk, need to be careful not to double
1495  * free. We could introduce refcounting for that, but for
1496  * now this seems infrequent enough not to care.
1497  */
1498  else if (change->data.snapshot->copied)
1499  {
1500  snapshot_now =
1501  ReorderBufferCopySnap(rb, change->data.snapshot,
1502  txn, command_id);
1503  }
1504  else
1505  {
1506  snapshot_now = change->data.snapshot;
1507  }
1508 
1509 
1510  /* and continue with the new one */
1511  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1512  break;
1513 
1515  Assert(change->data.command_id != InvalidCommandId);
1516 
1517  if (command_id < change->data.command_id)
1518  {
1519  command_id = change->data.command_id;
1520 
1521  if (!snapshot_now->copied)
1522  {
1523  /* we don't use the global one anymore */
1524  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1525  txn, command_id);
1526  }
1527 
1528  snapshot_now->curcid = command_id;
1529 
1530  TeardownHistoricSnapshot(false);
1531  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1532 
1533  /*
1534  * Every time the CommandId is incremented, we could
1535  * see new catalog contents, so execute all
1536  * invalidations.
1537  */
1539  }
1540 
1541  break;
1542 
1544  elog(ERROR, "tuplecid value in changequeue");
1545  break;
1546  }
1547  }
1548 
1549  /*
1550  * There's a speculative insertion remaining, just clean in up, it
1551  * can't have been successful, otherwise we'd gotten a confirmation
1552  * record.
1553  */
1554  if (specinsert)
1555  {
1556  ReorderBufferReturnChange(rb, specinsert);
1557  specinsert = NULL;
1558  }
1559 
1560  /* clean up the iterator */
1561  ReorderBufferIterTXNFinish(rb, iterstate);
1562  iterstate = NULL;
1563 
1564  /* call commit callback */
1565  rb->commit(rb, txn, commit_lsn);
1566 
1567  /* this is just a sanity check against bad output plugin behaviour */
1569  elog(ERROR, "output plugin used XID %u",
1571 
1572  /* cleanup */
1573  TeardownHistoricSnapshot(false);
1574 
1575  /*
1576  * Aborting the current (sub-)transaction as a whole has the right
1577  * semantics. We want all locks acquired in here to be released, not
1578  * reassigned to the parent and we do not want any database access
1579  * have persistent effects.
1580  */
1582 
1583  /* make sure there's no cache pollution */
1585 
1586  if (using_subtxn)
1588 
1589  if (snapshot_now->copied)
1590  ReorderBufferFreeSnap(rb, snapshot_now);
1591 
1592  /* remove potential on-disk data, and deallocate */
1593  ReorderBufferCleanupTXN(rb, txn);
1594  }
1595  PG_CATCH();
1596  {
1597  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1598  if (iterstate)
1599  ReorderBufferIterTXNFinish(rb, iterstate);
1600 
1602 
1603  /*
1604  * Force cache invalidation to happen outside of a valid transaction
1605  * to prevent catalog access as we just caught an error.
1606  */
1608 
1609  /* make sure there's no cache pollution */
1611 
1612  if (using_subtxn)
1614 
1615  if (snapshot_now->copied)
1616  ReorderBufferFreeSnap(rb, snapshot_now);
1617 
1618  /* remove potential on-disk data, and deallocate */
1619  ReorderBufferCleanupTXN(rb, txn);
1620 
1621  PG_RE_THROW();
1622  }
1623  PG_END_TRY();
1624 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
uint32 CommandId
Definition: c.h:469
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
TimestampTz commit_time
void AbortCurrentTransaction(void)
Definition: xact.c:2985
bool IsToastRelation(Relation relation)
Definition: catalog.c:136
#define relpathperm(rnode, forknum)
Definition: relpath.h:67
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
ReorderBufferApplyChangeCB apply_change
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
bool copied
Definition: snapshot.h:96
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4466
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2017
ReorderBufferCommitCB commit
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:584
Form_pg_class rd_rel
Definition: rel.h:114
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
XLogRecPtr origin_lsn
#define FirstCommandId
Definition: c.h:471
struct ReorderBufferChange::@102::@103 tp
#define ERROR
Definition: elog.h:43
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:418
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4277
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:435
#define InvalidTransactionId
Definition: transam.h:31
XLogRecPtr final_lsn
void RelationClose(Relation relation)
Definition: relcache.c:2122
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
ReorderBufferMessageCB message
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
#define InvalidCommandId
Definition: c.h:472
union ReorderBufferChange::@102 data
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
struct ReorderBufferChange::@102::@104 msg
#define InvalidOid
Definition: postgres_ext.h:36
CommandId curcid
Definition: snapshot.h:98
static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define PG_CATCH()
Definition: elog.h:293
#define Assert(condition)
Definition: c.h:680
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
XLogRecPtr end_lsn
void StartTransactionCommand(void)
Definition: xact.c:2674
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4172
#define PG_RE_THROW()
Definition: elog.h:314
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2001
#define elog
Definition: elog.h:219
ReorderBufferBeginCB begin
#define PG_TRY()
Definition: elog.h:284
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
#define RELKIND_SEQUENCE
Definition: pg_class.h:162
static ReorderBufferIterTXNState * ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2033
#define PG_END_TRY()
Definition: elog.h:300

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 701 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_delete(), dlist_push_tail(), elog, ReorderBufferTXN::end_lsn, ERROR, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

704 {
705  ReorderBufferTXN *txn;
706  ReorderBufferTXN *subtxn;
707 
708  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
709  InvalidXLogRecPtr, false);
710 
711  /*
712  * No need to do anything if that subtxn didn't contain any changes
713  */
714  if (!subtxn)
715  return;
716 
717  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true);
718 
719  if (txn == NULL)
720  elog(ERROR, "subxact logged without previous toplevel record");
721 
722  /*
723  * Pass our base snapshot to the parent transaction if it doesn't have
724  * one, or ours is older. That can happen if there are no changes in the
725  * toplevel transaction but in one of the child transactions. This allows
726  * the parent to simply use its base snapshot initially.
727  */
728  if (subtxn->base_snapshot != NULL &&
729  (txn->base_snapshot == NULL ||
730  txn->base_snapshot_lsn > subtxn->base_snapshot_lsn))
731  {
732  txn->base_snapshot = subtxn->base_snapshot;
733  txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
734  subtxn->base_snapshot = NULL;
736  }
737 
738  subtxn->final_lsn = commit_lsn;
739  subtxn->end_lsn = end_lsn;
740 
741  if (!subtxn->is_known_as_subxact)
742  {
743  subtxn->is_known_as_subxact = true;
744  Assert(subtxn->nsubtxns == 0);
745 
746  /* remove from lsn order list of top-level transactions */
747  dlist_delete(&subtxn->node);
748 
749  /* add to subtransaction list */
750  dlist_push_tail(&txn->subtxns, &subtxn->node);
751  txn->nsubtxns++;
752  }
753 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
XLogRecPtr base_snapshot_lsn
#define ERROR
Definition: elog.h:43
XLogRecPtr final_lsn
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define Assert(condition)
Definition: c.h:680
XLogRecPtr end_lsn
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define elog
Definition: elog.h:219

◆ ReorderBufferCopySnap()

static Snapshot ReorderBufferCopySnap ( ReorderBuffer rb,
Snapshot  orig_snap,
ReorderBufferTXN txn,
CommandId  cid 
)
static

Definition at line 1187 of file reorderbuffer.c.

References SnapshotData::active_count, ReorderBuffer::context, SnapshotData::copied, dlist_iter::cur, SnapshotData::curcid, dlist_container, dlist_foreach, i, MemoryContextAllocZero(), ReorderBufferTXN::nsubtxns, qsort, SnapshotData::regd_count, ReorderBufferTXN::subtxns, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::xcnt, ReorderBufferTXN::xid, xidComparator(), and SnapshotData::xip.

Referenced by ReorderBufferCommit().

1189 {
1190  Snapshot snap;
1191  dlist_iter iter;
1192  int i = 0;
1193  Size size;
1194 
1195  size = sizeof(SnapshotData) +
1196  sizeof(TransactionId) * orig_snap->xcnt +
1197  sizeof(TransactionId) * (txn->nsubtxns + 1);
1198 
1199  snap = MemoryContextAllocZero(rb->context, size);
1200  memcpy(snap, orig_snap, sizeof(SnapshotData));
1201 
1202  snap->copied = true;
1203  snap->active_count = 1; /* mark as active so nobody frees it */
1204  snap->regd_count = 0;
1205  snap->xip = (TransactionId *) (snap + 1);
1206 
1207  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1208 
1209  /*
1210  * snap->subxip contains all txids that belong to our transaction which we
1211  * need to check via cmin/cmax. That's why we store the toplevel
1212  * transaction in there as well.
1213  */
1214  snap->subxip = snap->xip + snap->xcnt;
1215  snap->subxip[i++] = txn->xid;
1216 
1217  /*
1218  * nsubxcnt isn't decreased when subtransactions abort, so count manually.
1219  * Since it's an upper boundary it is safe to use it for the allocation
1220  * above.
1221  */
1222  snap->subxcnt = 1;
1223 
1224  dlist_foreach(iter, &txn->subtxns)
1225  {
1226  ReorderBufferTXN *sub_txn;
1227 
1228  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1229  snap->subxip[i++] = sub_txn->xid;
1230  snap->subxcnt++;
1231  }
1232 
1233  /* sort so we can bsearch() later */
1234  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1235 
1236  /* store the specified current CommandId */
1237  snap->curcid = cid;
1238 
1239  return snap;
1240 }
uint32 TransactionId
Definition: c.h:455
bool copied
Definition: snapshot.h:96
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
uint32 regd_count
Definition: snapshot.h:110
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
struct SnapshotData SnapshotData
TransactionId * xip
Definition: snapshot.h:79
MemoryContext context
TransactionId xid
dlist_node * cur
Definition: ilist.h:161
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:728
CommandId curcid
Definition: snapshot.h:98
size_t Size
Definition: c.h:414
dlist_head subtxns
uint32 xcnt
Definition: snapshot.h:80
int i
#define qsort(a, b, c, d)
Definition: port.h:408
TransactionId * subxip
Definition: snapshot.h:91
uint32 active_count
Definition: snapshot.h:109
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:138
int32 subxcnt
Definition: snapshot.h:92

◆ ReorderBufferExecuteInvalidations()

static void ReorderBufferExecuteInvalidations ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1918 of file reorderbuffer.c.

References i, ReorderBufferTXN::invalidations, LocalExecuteInvalidationMessage(), and ReorderBufferTXN::ninvalidations.

Referenced by ReorderBufferCommit().

1919 {
1920  int i;
1921 
1922  for (i = 0; i < txn->ninvalidations; i++)
1924 }
SharedInvalidationMessage * invalidations
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:554
int i

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1723 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

1724 {
1725  ReorderBufferTXN *txn;
1726 
1727  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1728  false);
1729 
1730  /* unknown, nothing to forget */
1731  if (txn == NULL)
1732  return;
1733 
1734  /* cosmetic... */
1735  txn->final_lsn = lsn;
1736 
1737  /*
1738  * Process cache invalidation messages if there are any. Even if we're not
1739  * interested in the transaction's contents, it could have manipulated the
1740  * catalog and we need to update the caches according to that.
1741  */
1742  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
1744  txn->invalidations);
1745  else
1746  Assert(txn->ninvalidations == 0);
1747 
1748  /* remove potential on-disk data, and deallocate */
1749  ReorderBufferCleanupTXN(rb, txn);
1750 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:680
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer rb)

Definition at line 279 of file reorderbuffer.c.

References ReorderBuffer::context, and MemoryContextDelete().

Referenced by FreeDecodingContext().

280 {
281  MemoryContext context = rb->context;
282 
283  /*
284  * We free separately allocated data by entirely scrapping reorderbuffer's
285  * memory context.
286  */
287  MemoryContextDelete(context);
288 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:198
MemoryContext context

◆ ReorderBufferFreeSnap()

static void ReorderBufferFreeSnap ( ReorderBuffer rb,
Snapshot  snap 
)
static

Definition at line 1246 of file reorderbuffer.c.

References SnapshotData::copied, pfree(), and SnapBuildSnapDecRefcount().

Referenced by ReorderBufferCommit(), and ReorderBufferReturnChange().

1247 {
1248  if (snap->copied)
1249  pfree(snap);
1250  else
1252 }
bool copied
Definition: snapshot.h:96
void pfree(void *pointer)
Definition: mcxt.c:936
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:434

◆ ReorderBufferGetChange()

ReorderBufferChange* ReorderBufferGetChange ( ReorderBuffer rb)

Definition at line 347 of file reorderbuffer.c.

References ReorderBuffer::change_context, and MemoryContextAlloc().

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddNewTupleCids(), ReorderBufferAddSnapshot(), ReorderBufferQueueMessage(), and ReorderBufferRestoreChange().

348 {
349  ReorderBufferChange *change;
350 
351  change = (ReorderBufferChange *)
353 
354  memset(change, 0, sizeof(ReorderBufferChange));
355  return change;
356 }
MemoryContext change_context
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:693

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer rb)

Definition at line 633 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessRunningXacts().

634 {
635  ReorderBufferTXN *txn;
636 
638  return NULL;
639 
640  AssertTXNLsnOrder(rb);
641 
643 
646  return txn;
647 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
dlist_head toplevel_by_lsn
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:680
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289

◆ ReorderBufferGetTupleBuf()

ReorderBufferTupleBuf* ReorderBufferGetTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)

Definition at line 416 of file reorderbuffer.c.

References ReorderBufferTupleBuf::alloc_tuple_size, MemoryContextAlloc(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, HeapTupleData::t_data, ReorderBuffer::tup_context, and ReorderBufferTupleBuf::tuple.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

417 {
418  ReorderBufferTupleBuf *tuple;
419  Size alloc_len;
420 
421  alloc_len = tuple_len + SizeofHeapTupleHeader;
422 
423  tuple = (ReorderBufferTupleBuf *)
425  sizeof(ReorderBufferTupleBuf) +
426  MAXIMUM_ALIGNOF + alloc_len);
427  tuple->alloc_tuple_size = alloc_len;
428  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
429 
430  return tuple;
431 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:175
HeapTupleHeader t_data
Definition: htup.h:67
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:36
HeapTupleData tuple
Definition: reorderbuffer.h:27
size_t Size
Definition: c.h:414
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:693
MemoryContext tup_context

◆ ReorderBufferGetTXN()

static ReorderBufferTXN * ReorderBufferGetTXN ( ReorderBuffer rb)
static

Definition at line 294 of file reorderbuffer.c.

References ReorderBufferTXN::changes, dlist_init(), MemoryContextAlloc(), ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferTXNByIdEnt::txn, and ReorderBuffer::txn_context.

Referenced by ReorderBufferTXNByXid().

295 {
296  ReorderBufferTXN *txn;
297 
298  txn = (ReorderBufferTXN *)
300 
301  memset(txn, 0, sizeof(ReorderBufferTXN));
302 
303  dlist_init(&txn->changes);
304  dlist_init(&txn->tuplecids);
305  dlist_init(&txn->subtxns);
306 
307  return txn;
308 }
dlist_head changes
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
dlist_head subtxns
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:693
MemoryContext txn_context
dlist_head tuplecids

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 1759 of file reorderbuffer.c.

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by DecodeStandbyOp(), and ReorderBufferForget().

1761 {
1762  bool use_subtxn = IsTransactionOrTransactionBlock();
1763  int i;
1764 
1765  if (use_subtxn)
1766  BeginInternalSubTransaction("replay");
1767 
1768  /*
1769  * Force invalidations to happen outside of a valid transaction - that way
1770  * entries will just be marked as invalid without accessing the catalog.
1771  * That's advantageous because we don't need to setup the full state
1772  * necessary for catalog access.
1773  */
1774  if (use_subtxn)
1776 
1777  for (i = 0; i < ninvalidations; i++)
1778  LocalExecuteInvalidationMessage(&invalidations[i]);
1779 
1780  if (use_subtxn)
1782 }
void AbortCurrentTransaction(void)
Definition: xact.c:2985
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4466
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4277
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4172
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:554
int i

◆ ReorderBufferIterCompare()

static int ReorderBufferIterCompare ( Datum  a,
Datum  b,
void *  arg 
)
static

Definition at line 772 of file reorderbuffer.c.

References DatumGetInt32, ReorderBufferIterTXNState::entries, and ReorderBufferIterTXNEntry::lsn.

Referenced by ReorderBufferIterTXNInit().

773 {
775  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
776  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
777 
778  if (pos_a < pos_b)
779  return 1;
780  else if (pos_a == pos_b)
781  return 0;
782  return -1;
783 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
#define DatumGetInt32(X)
Definition: postgres.h:478
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:298
void * arg

◆ ReorderBufferIterTXNFinish()

static void ReorderBufferIterTXNFinish ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 999 of file reorderbuffer.c.

References Assert, binaryheap_free(), CloseTransientFile(), dlist_container, dlist_is_empty(), dlist_pop_head_node(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::fd, ReorderBufferIterTXNState::heap, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, pfree(), and ReorderBufferReturnChange().

Referenced by ReorderBufferCommit().

1001 {
1002  int32 off;
1003 
1004  for (off = 0; off < state->nr_txns; off++)
1005  {
1006  if (state->entries[off].fd != -1)
1007  CloseTransientFile(state->entries[off].fd);
1008  }
1009 
1010  /* free memory we might have "leaked" in the last *Next call */
1011  if (!dlist_is_empty(&state->old_change))
1012  {
1013  ReorderBufferChange *change;
1014 
1015  change = dlist_container(ReorderBufferChange, node,
1016  dlist_pop_head_node(&state->old_change));
1017  ReorderBufferReturnChange(rb, change);
1018  Assert(dlist_is_empty(&state->old_change));
1019  }
1020 
1021  binaryheap_free(state->heap);
1022  pfree(state);
1023 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
signed int int32
Definition: c.h:294
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:936
int CloseTransientFile(int fd)
Definition: fd.c:2563
#define Assert(condition)
Definition: c.h:680
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:69
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368

◆ ReorderBufferIterTXNInit()

static ReorderBufferIterTXNState * ReorderBufferIterTXNInit ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 790 of file reorderbuffer.c.

References binaryheap_add_unordered(), binaryheap_allocate(), binaryheap_build(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, ReorderBuffer::context, dlist_iter::cur, dlist_container, dlist_foreach, dlist_head_element, dlist_init(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::fd, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, MemoryContextAllocZero(), ReorderBufferTXN::nentries, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, ReorderBufferIterCompare(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferIterTXNEntry::segno, ReorderBufferTXN::serialized, ReorderBufferTXN::subtxns, ReorderBufferTXNByIdEnt::txn, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferCommit().

791 {
792  Size nr_txns = 0;
794  dlist_iter cur_txn_i;
795  int32 off;
796 
797  /*
798  * Calculate the size of our heap: one element for every transaction that
799  * contains changes. (Besides the transactions already in the reorder
800  * buffer, we count the one we were directly passed.)
801  */
802  if (txn->nentries > 0)
803  nr_txns++;
804 
805  dlist_foreach(cur_txn_i, &txn->subtxns)
806  {
807  ReorderBufferTXN *cur_txn;
808 
809  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
810 
811  if (cur_txn->nentries > 0)
812  nr_txns++;
813  }
814 
815  /*
816  * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no
817  * need to allocate/build a heap then.
818  */
819 
820  /* allocate iteration state */
821  state = (ReorderBufferIterTXNState *)
823  sizeof(ReorderBufferIterTXNState) +
824  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
825 
826  state->nr_txns = nr_txns;
827  dlist_init(&state->old_change);
828 
829  for (off = 0; off < state->nr_txns; off++)
830  {
831  state->entries[off].fd = -1;
832  state->entries[off].segno = 0;
833  }
834 
835  /* allocate heap */
836  state->heap = binaryheap_allocate(state->nr_txns,
838  state);
839 
840  /*
841  * Now insert items into the binary heap, in an unordered fashion. (We
842  * will run a heap assembly step at the end; this is more efficient.)
843  */
844 
845  off = 0;
846 
847  /* add toplevel transaction if it contains changes */
848  if (txn->nentries > 0)
849  {
850  ReorderBufferChange *cur_change;
851 
852  if (txn->serialized)
853  {
854  /* serialize remaining changes */
855  ReorderBufferSerializeTXN(rb, txn);
856  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
857  &state->entries[off].segno);
858  }
859 
860  cur_change = dlist_head_element(ReorderBufferChange, node,
861  &txn->changes);
862 
863  state->entries[off].lsn = cur_change->lsn;
864  state->entries[off].change = cur_change;
865  state->entries[off].txn = txn;
866 
868  }
869 
870  /* add subtransactions if they contain changes */
871  dlist_foreach(cur_txn_i, &txn->subtxns)
872  {
873  ReorderBufferTXN *cur_txn;
874 
875  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
876 
877  if (cur_txn->nentries > 0)
878  {
879  ReorderBufferChange *cur_change;
880 
881  if (cur_txn->serialized)
882  {
883  /* serialize remaining changes */
884  ReorderBufferSerializeTXN(rb, cur_txn);
885  ReorderBufferRestoreChanges(rb, cur_txn,
886  &state->entries[off].fd,
887  &state->entries[off].segno);
888  }
889  cur_change = dlist_head_element(ReorderBufferChange, node,
890  &cur_txn->changes);
891 
892  state->entries[off].lsn = cur_change->lsn;
893  state->entries[off].change = cur_change;
894  state->entries[off].txn = cur_txn;
895 
897  }
898  }
899 
900  /* assemble a valid binary heap */
901  binaryheap_build(state->heap);
902 
903  return state;
904 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
ReorderBufferTXN * txn
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, int *fd, XLogSegNo *segno)
void binaryheap_add_unordered(binaryheap *heap, Datum d)
Definition: binaryheap.c:110
signed int int32
Definition: c.h:294
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
ReorderBufferChange * change
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
MemoryContext context
dlist_node * cur
Definition: ilist.h:161
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:728
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:126
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
Definition: regguts.h:298
size_t Size
Definition: c.h:414
dlist_head subtxns
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:33
#define Int32GetDatum(X)
Definition: postgres.h:485

◆ ReorderBufferIterTXNNext()

static ReorderBufferChange * ReorderBufferIterTXNNext ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 913 of file reorderbuffer.c.

References Assert, binaryheap::bh_size, binaryheap_first(), binaryheap_remove_first(), binaryheap_replace_first(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, DatumGetInt32, DEBUG2, dlist_container, dlist_delete(), dlist_has_next(), dlist_head_element, dlist_is_empty(), dlist_next_node(), dlist_pop_head_node(), dlist_push_tail(), elog, ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::fd, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, next, ReorderBufferChange::node, ReorderBufferIterTXNState::old_change, ReorderBufferRestoreChanges(), ReorderBufferReturnChange(), ReorderBufferIterTXNEntry::segno, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferCommit().

914 {
915  ReorderBufferChange *change;
917  int32 off;
918 
919  /* nothing there anymore */
920  if (state->heap->bh_size == 0)
921  return NULL;
922 
923  off = DatumGetInt32(binaryheap_first(state->heap));
924  entry = &state->entries[off];
925 
926  /* free memory we might have "leaked" in the previous *Next call */
927  if (!dlist_is_empty(&state->old_change))
928  {
929  change = dlist_container(ReorderBufferChange, node,
931  ReorderBufferReturnChange(rb, change);
932  Assert(dlist_is_empty(&state->old_change));
933  }
934 
935  change = entry->change;
936 
937  /*
938  * update heap with information about which transaction has the next
939  * relevant change in LSN order
940  */
941 
942  /* there are in-memory changes */
943  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
944  {
945  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
946  ReorderBufferChange *next_change =
948 
949  /* txn stays the same */
950  state->entries[off].lsn = next_change->lsn;
951  state->entries[off].change = next_change;
952 
954  return change;
955  }
956 
957  /* try to load changes from disk */
958  if (entry->txn->nentries != entry->txn->nentries_mem)
959  {
960  /*
961  * Ugly: restoring changes will reuse *Change records, thus delete the
962  * current one from the per-tx list and only free in the next call.
963  */
964  dlist_delete(&change->node);
965  dlist_push_tail(&state->old_change, &change->node);
966 
967  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
968  &state->entries[off].segno))
969  {
970  /* successfully restored changes from disk */
971  ReorderBufferChange *next_change =
973  &entry->txn->changes);
974 
975  elog(DEBUG2, "restored %u/%u changes from disk",
976  (uint32) entry->txn->nentries_mem,
977  (uint32) entry->txn->nentries);
978 
979  Assert(entry->txn->nentries_mem);
980  /* txn stays the same */
981  state->entries[off].lsn = next_change->lsn;
982  state->entries[off].change = next_change;
984 
985  return change;
986  }
987  }
988 
989  /* ok, no changes there anymore, remove */
991 
992  return change;
993 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
static int32 next
Definition: blutils.c:210
#define DatumGetInt32(X)
Definition: postgres.h:478
ReorderBufferTXN * txn
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void binaryheap_replace_first(binaryheap *heap, Datum d)
Definition: binaryheap.c:204
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, int *fd, XLogSegNo *segno)
signed int int32
Definition: c.h:294
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:421
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
Datum binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:159
#define DEBUG2
Definition: elog.h:24
unsigned int uint32
Definition: c.h:306
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
int bh_size
Definition: binaryheap.h:32
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
ReorderBufferChange * change
static bool dlist_has_next(dlist_head *head, dlist_node *node)
Definition: ilist.h:402
#define Assert(condition)
Definition: c.h:680
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define Int32GetDatum(X)
Definition: postgres.h:485
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
Datum binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:174
#define elog
Definition: elog.h:219

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1795 of file reorderbuffer.c.

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by DecodeHeap2Op(), DecodeHeapOp(), DecodeLogicalMsgOp(), DecodeStandbyOp(), DecodeXactOp(), DecodeXLogOp(), and LogicalDecodingProcessRecord().

1796 {
1797  /* many records won't have an xid assigned, centralize check here */
1798  if (xid != InvalidTransactionId)
1799  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1800 }
#define InvalidTransactionId
Definition: transam.h:31
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change 
)

Definition at line 535 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBufferCheckSerializeTXN(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

537 {
538  ReorderBufferTXN *txn;
539 
540  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
541 
542  change->lsn = lsn;
543  Assert(InvalidXLogRecPtr != lsn);
544  dlist_push_tail(&txn->changes, &change->node);
545  txn->nentries++;
546  txn->nentries_mem++;
547 
549 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
dlist_head changes
#define Assert(condition)
Definition: c.h:680
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snapshot,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 555 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), TeardownHistoricSnapshot(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeLogicalMsgOp().

559 {
560  if (transactional)
561  {
562  MemoryContext oldcontext;
563  ReorderBufferChange *change;
564 
566 
567  oldcontext = MemoryContextSwitchTo(rb->context);
568 
569  change = ReorderBufferGetChange(rb);
571  change->data.msg.prefix = pstrdup(prefix);
572  change->data.msg.message_size = message_size;
573  change->data.msg.message = palloc(message_size);
574  memcpy(change->data.msg.message, message, message_size);
575 
576  ReorderBufferQueueChange(rb, xid, lsn, change);
577 
578  MemoryContextSwitchTo(oldcontext);
579  }
580  else
581  {
582  ReorderBufferTXN *txn = NULL;
583  volatile Snapshot snapshot_now = snapshot;
584 
585  if (xid != InvalidTransactionId)
586  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
587 
588  /* setup snapshot to allow catalog access */
589  SetupHistoricSnapshot(snapshot_now, NULL);
590  PG_TRY();
591  {
592  rb->message(rb, txn, lsn, false, prefix, message_size, message);
593 
595  }
596  PG_CATCH();
597  {
599  PG_RE_THROW();
600  }
601  PG_END_TRY();
602  }
603 }
char * pstrdup(const char *in)
Definition: mcxt.c:1063
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2017
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
#define InvalidTransactionId
Definition: transam.h:31
ReorderBufferMessageCB message
MemoryContext context
union ReorderBufferChange::@102 data
struct ReorderBufferChange::@102::@104 msg
#define PG_CATCH()
Definition: elog.h:293
#define Assert(condition)
Definition: c.h:680
#define PG_RE_THROW()
Definition: elog.h:314
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:835
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2001
#define PG_TRY()
Definition: elog.h:284
#define PG_END_TRY()
Definition: elog.h:300

◆ ReorderBufferRestoreChange()

static void ReorderBufferRestoreChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
char *  change 
)
static

Definition at line 2409 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, ReorderBufferTXN::changes, ReorderBuffer::context, ReorderBufferChange::data, dlist_push_tail(), MemoryContextAlloc(), MemoryContextAllocZero(), ReorderBufferChange::msg, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, offsetof, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferGetChange(), ReorderBufferGetTupleBuf(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, ReorderBufferChange::snapshot, SnapshotData::subxcnt, ReorderBufferChange::tp, SnapshotData::xcnt, and SnapshotData::xip.

Referenced by ReorderBufferRestoreChanges().

2411 {
2412  ReorderBufferDiskChange *ondisk;
2413  ReorderBufferChange *change;
2414 
2415  ondisk = (ReorderBufferDiskChange *) data;
2416 
2417  change = ReorderBufferGetChange(rb);
2418 
2419  /* copy static part */
2420  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
2421 
2422  data += sizeof(ReorderBufferDiskChange);
2423 
2424  /* restore individual stuff */
2425  switch (change->action)
2426  {
2427  /* fall through these, they're all similar enough */
2432  if (change->data.tp.oldtuple)
2433  {
2434  uint32 tuplelen = ((HeapTuple) data)->t_len;
2435 
2436  change->data.tp.oldtuple =
2438 
2439  /* restore ->tuple */
2440  memcpy(&change->data.tp.oldtuple->tuple, data,
2441  sizeof(HeapTupleData));
2442  data += sizeof(HeapTupleData);
2443 
2444  /* reset t_data pointer into the new tuplebuf */
2445  change->data.tp.oldtuple->tuple.t_data =
2446  ReorderBufferTupleBufData(change->data.tp.oldtuple);
2447 
2448  /* restore tuple data itself */
2449  memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
2450  data += tuplelen;
2451  }
2452 
2453  if (change->data.tp.newtuple)
2454  {
2455  /* here, data might not be suitably aligned! */
2456  uint32 tuplelen;
2457 
2458  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
2459  sizeof(uint32));
2460 
2461  change->data.tp.newtuple =
2463 
2464  /* restore ->tuple */
2465  memcpy(&change->data.tp.newtuple->tuple, data,
2466  sizeof(HeapTupleData));
2467  data += sizeof(HeapTupleData);
2468 
2469  /* reset t_data pointer into the new tuplebuf */
2470  change->data.tp.newtuple->tuple.t_data =
2471  ReorderBufferTupleBufData(change->data.tp.newtuple);
2472 
2473  /* restore tuple data itself */
2474  memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
2475  data += tuplelen;
2476  }
2477 
2478  break;
2480  {
2481  Size prefix_size;
2482 
2483  /* read prefix */
2484  memcpy(&prefix_size, data, sizeof(Size));
2485  data += sizeof(Size);
2486  change->data.msg.prefix = MemoryContextAlloc(rb->context,
2487  prefix_size);
2488  memcpy(change->data.msg.prefix, data, prefix_size);
2489  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
2490  data += prefix_size;
2491 
2492  /* read the message */
2493  memcpy(&change->data.msg.message_size, data, sizeof(Size));
2494  data += sizeof(Size);
2495  change->data.msg.message = MemoryContextAlloc(rb->context,
2496  change->data.msg.message_size);
2497  memcpy(change->data.msg.message, data,
2498  change->data.msg.message_size);
2499  data += change->data.msg.message_size;
2500 
2501  break;
2502  }
2504  {
2505  Snapshot oldsnap;
2506  Snapshot newsnap;
2507  Size size;
2508 
2509  oldsnap = (Snapshot) data;
2510 
2511  size = sizeof(SnapshotData) +
2512  sizeof(TransactionId) * oldsnap->xcnt +
2513  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
2514 
2515  change->data.snapshot = MemoryContextAllocZero(rb->context, size);
2516 
2517  newsnap = change->data.snapshot;
2518 
2519  memcpy(newsnap, data, size);
2520  newsnap->xip = (TransactionId *)
2521  (((char *) newsnap) + sizeof(SnapshotData));
2522  newsnap->subxip = newsnap->xip + newsnap->xcnt;
2523  newsnap->copied = true;
2524  break;
2525  }
2526  /* the base struct contains all the data, easy peasy */
2530  break;
2531  }
2532 
2533  dlist_push_tail(&txn->changes, &change->node);
2534  txn->nentries_mem++;
2535 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:175
HeapTupleData * HeapTuple
Definition: htup.h:70
uint32 TransactionId
Definition: c.h:455
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
struct SnapshotData * Snapshot
Definition: snapshot.h:23
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:36
struct ReorderBufferChange::@102::@103 tp
dlist_head changes
struct SnapshotData SnapshotData
unsigned int uint32
Definition: c.h:306
TransactionId * xip
Definition: snapshot.h:79
MemoryContext context
union ReorderBufferChange::@102 data
struct ReorderBufferChange::@102::@104 msg
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:728
struct ReorderBufferDiskChange ReorderBufferDiskChange
#define Assert(condition)
Definition: c.h:680
size_t Size
Definition: c.h:414
uint32 xcnt
Definition: snapshot.h:80
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:693
ReorderBufferChange change
struct HeapTupleData HeapTupleData
#define offsetof(type, field)
Definition: c.h:603
int32 subxcnt
Definition: snapshot.h:92
ReorderBufferTupleBuf * ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)

◆ ReorderBufferRestoreChanges()

static Size ReorderBufferRestoreChanges ( ReorderBuffer rb,
ReorderBufferTXN txn,
int *  fd,
XLogSegNo segno 
)
static

Definition at line 2272 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, cleanup(), CloseTransientFile(), dlist_mutable_iter::cur, ReplicationSlot::data, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, max_changes_in_memory, MAXPGPATH, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, OpenTransientFile(), ReorderBuffer::outbuf, PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, WAIT_EVENT_REORDER_BUFFER_READ, wal_segment_size, ReorderBufferTXN::xid, XLByteToSeg, and XLogSegNoOffsetToRecPtr.

Referenced by ReorderBufferIterTXNInit(), and ReorderBufferIterTXNNext().

2274 {
2275  Size restored = 0;
2276  XLogSegNo last_segno;
2277  dlist_mutable_iter cleanup_iter;
2278 
2281 
2282  /* free current entries, so we have memory for more */
2283  dlist_foreach_modify(cleanup_iter, &txn->changes)
2284  {
2286  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
2287 
2288  dlist_delete(&cleanup->node);
2289  ReorderBufferReturnChange(rb, cleanup);
2290  }
2291  txn->nentries_mem = 0;
2292  Assert(dlist_is_empty(&txn->changes));
2293 
2294  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
2295 
2296  while (restored < max_changes_in_memory && *segno <= last_segno)
2297  {
2298  int readBytes;
2299  ReorderBufferDiskChange *ondisk;
2300 
2301  if (*fd == -1)
2302  {
2303  XLogRecPtr recptr;
2304  char path[MAXPGPATH];
2305 
2306  /* first time in */
2307  if (*segno == 0)
2308  {
2309  XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
2310  }
2311 
2312  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2313  XLogSegNoOffsetToRecPtr(*segno, 0, recptr, wal_segment_size);
2314 
2315  /*
2316  * No need to care about TLIs here, only used during a single run,
2317  * so each LSN only maps to a specific WAL record.
2318  */
2319  sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2321  (uint32) (recptr >> 32), (uint32) recptr);
2322 
2323  *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
2324  if (*fd < 0 && errno == ENOENT)
2325  {
2326  *fd = -1;
2327  (*segno)++;
2328  continue;
2329  }
2330  else if (*fd < 0)
2331  ereport(ERROR,
2333  errmsg("could not open file \"%s\": %m",
2334  path)));
2335 
2336  }
2337 
2338  /*
2339  * Read the statically sized part of a change which has information
2340  * about the total size. If we couldn't read a record, we're at the
2341  * end of this file.
2342  */
2345  readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
2347 
2348  /* eof */
2349  if (readBytes == 0)
2350  {
2352  *fd = -1;
2353  (*segno)++;
2354  continue;
2355  }
2356  else if (readBytes < 0)
2357  ereport(ERROR,
2359  errmsg("could not read from reorderbuffer spill file: %m")));
2360  else if (readBytes != sizeof(ReorderBufferDiskChange))
2361  ereport(ERROR,
2363  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2364  readBytes,
2365  (uint32) sizeof(ReorderBufferDiskChange))));
2366 
2367  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2368 
2370  sizeof(ReorderBufferDiskChange) + ondisk->size);
2371  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2372 
2374  readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
2375  ondisk->size - sizeof(ReorderBufferDiskChange));
2377 
2378  if (readBytes < 0)
2379  ereport(ERROR,
2381  errmsg("could not read from reorderbuffer spill file: %m")));
2382  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
2383  ereport(ERROR,
2385  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2386  readBytes,
2387  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
2388 
2389  /*
2390  * ok, read a full change from disk, now restore it into proper
2391  * in-memory format
2392  */
2393  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
2394  restored++;
2395  }
2396 
2397  return restored;
2398 }
XLogRecPtr first_lsn
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
dlist_node * cur
Definition: ilist.h:180
int wal_segment_size
Definition: xlog.c:113
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
ReplicationSlotPersistentData data
Definition: slot.h:120
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1049
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2393
dlist_head changes
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:34
int errcode_for_file_access(void)
Definition: elog.c:598
#define XLogSegNoOffsetToRecPtr(segno, offset, dest, wal_segsz_bytes)
unsigned int uint32
Definition: c.h:306
XLogRecPtr final_lsn
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1259
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define ereport(elevel, rest)
Definition: elog.h:122
int CloseTransientFile(int fd)
Definition: fd.c:2563
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
static void cleanup(void)
Definition: bootstrap.c:873
TransactionId xid
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:680
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
size_t Size
Definition: c.h:414
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1235
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define NameStr(name)
Definition: c.h:557
static const Size max_changes_in_memory
#define read(a, b, c)
Definition: win32.h:13
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ ReorderBufferRestoreCleanup()

static void ReorderBufferRestoreCleanup ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2541 of file reorderbuffer.c.

References Assert, cur, ReplicationSlot::data, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, MAXPGPATH, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, wal_segment_size, ReorderBufferTXN::xid, XLByteToSeg, and XLogSegNoOffsetToRecPtr.

Referenced by ReorderBufferCleanupTXN().

2542 {
2543  XLogSegNo first;
2544  XLogSegNo cur;
2545  XLogSegNo last;
2546 
2549 
2550  XLByteToSeg(txn->first_lsn, first, wal_segment_size);
2551  XLByteToSeg(txn->final_lsn, last, wal_segment_size);
2552 
2553  /* iterate over all possible filenames, and delete them */
2554  for (cur = first; cur <= last; cur++)
2555  {
2556  char path[MAXPGPATH];
2557  XLogRecPtr recptr;
2558 
2559  XLogSegNoOffsetToRecPtr(cur, 0, recptr, wal_segment_size);
2560 
2561  sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2563  (uint32) (recptr >> 32), (uint32) recptr);
2564  if (unlink(path) != 0 && errno != ENOENT)
2565  ereport(ERROR,
2567  errmsg("could not remove file \"%s\": %m", path)));
2568  }
2569 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int wal_segment_size
Definition: xlog.c:113
struct cursor * cur
Definition: ecpg.c:28
ReplicationSlotPersistentData data
Definition: slot.h:120
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:34
int errcode_for_file_access(void)
Definition: elog.c:598
#define XLogSegNoOffsetToRecPtr(segno, offset, dest, wal_segsz_bytes)
unsigned int uint32
Definition: c.h:306
XLogRecPtr final_lsn
#define ereport(elevel, rest)
Definition: elog.h:122
TransactionId xid
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:680
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define NameStr(name)
Definition: c.h:557
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ ReorderBufferReturnChange()

void ReorderBufferReturnChange ( ReorderBuffer rb,
ReorderBufferChange change 
)

Definition at line 365 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::msg, pfree(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferFreeSnap(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, and ReorderBufferChange::tp.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferCommit(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), and ReorderBufferToastReset().

366 {
367  /* free contained data */
368  switch (change->action)
369  {
374  if (change->data.tp.newtuple)
375  {
376  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
377  change->data.tp.newtuple = NULL;
378  }
379 
380  if (change->data.tp.oldtuple)
381  {
382  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
383  change->data.tp.oldtuple = NULL;
384  }
385  break;
387  if (change->data.msg.prefix != NULL)
388  pfree(change->data.msg.prefix);
389  change->data.msg.prefix = NULL;
390  if (change->data.msg.message != NULL)
391  pfree(change->data.msg.message);
392  change->data.msg.message = NULL;
393  break;
395  if (change->data.snapshot)
396  {
397  ReorderBufferFreeSnap(rb, change->data.snapshot);
398  change->data.snapshot = NULL;
399  }
400  break;
401  /* no data in addition to the struct itself */
405  break;
406  }
407 
408  pfree(change);
409 }
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
struct ReorderBufferChange::@102::@103 tp
void pfree(void *pointer)
Definition: mcxt.c:936
union ReorderBufferChange::@102 data
struct ReorderBufferChange::@102::@104 msg
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)

◆ ReorderBufferReturnTupleBuf()

void ReorderBufferReturnTupleBuf ( ReorderBuffer rb,
ReorderBufferTupleBuf tuple 
)

Definition at line 440 of file reorderbuffer.c.

References pfree().

Referenced by ReorderBufferReturnChange().

441 {
442  pfree(tuple);
443 }
void pfree(void *pointer)
Definition: mcxt.c:936

◆ ReorderBufferReturnTXN()

static void ReorderBufferReturnTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 317 of file reorderbuffer.c.

References ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, hash_destroy(), ReorderBufferTXN::invalidations, InvalidTransactionId, pfree(), ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::xid.

Referenced by ReorderBufferCleanupTXN().

318 {
319  /* clean the lookup cache if we were cached (quite likely) */
320  if (rb->by_txn_last_xid == txn->xid)
321  {
323  rb->by_txn_last_txn = NULL;
324  }
325 
326  /* free data that's contained */
327 
328  if (txn->tuplecid_hash != NULL)
329  {
331  txn->tuplecid_hash = NULL;
332  }
333 
334  if (txn->invalidations)
335  {
336  pfree(txn->invalidations);
337  txn->invalidations = NULL;
338  }
339 
340  pfree(txn);
341 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:812
TransactionId by_txn_last_xid
void pfree(void *pointer)
Definition: mcxt.c:936
#define InvalidTransactionId
Definition: transam.h:31
ReorderBufferTXN * by_txn_last_txn
TransactionId xid
SharedInvalidationMessage * invalidations

◆ ReorderBufferSerializeChange()

static void ReorderBufferSerializeChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  fd,
ReorderBufferChange change 
)
static

Definition at line 2108 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, CloseTransientFile(), ReorderBufferChange::data, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferChange::msg, ReorderBuffer::outbuf, pgstat_report_wait_end(), pgstat_report_wait_start(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferTupleBuf::tuple, WAIT_EVENT_REORDER_BUFFER_WRITE, write, SnapshotData::xcnt, ReorderBufferTXN::xid, and SnapshotData::xip.

Referenced by ReorderBufferSerializeTXN().

2110 {
2111  ReorderBufferDiskChange *ondisk;
2112  Size sz = sizeof(ReorderBufferDiskChange);
2113 
2115 
2116  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2117  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
2118 
2119  switch (change->action)
2120  {
2121  /* fall through these, they're all similar enough */
2126  {
2127  char *data;
2128  ReorderBufferTupleBuf *oldtup,
2129  *newtup;
2130  Size oldlen = 0;
2131  Size newlen = 0;
2132 
2133  oldtup = change->data.tp.oldtuple;
2134  newtup = change->data.tp.newtuple;
2135 
2136  if (oldtup)
2137  {
2138  sz += sizeof(HeapTupleData);
2139  oldlen = oldtup->tuple.t_len;
2140  sz += oldlen;
2141  }
2142 
2143  if (newtup)
2144  {
2145  sz += sizeof(HeapTupleData);
2146  newlen = newtup->tuple.t_len;
2147  sz += newlen;
2148  }
2149 
2150  /* make sure we have enough space */
2152 
2153  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2154  /* might have been reallocated above */
2155  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2156 
2157  if (oldlen)
2158  {
2159  memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
2160  data += sizeof(HeapTupleData);
2161 
2162  memcpy(data, oldtup->tuple.t_data, oldlen);
2163  data += oldlen;
2164  }
2165 
2166  if (newlen)
2167  {
2168  memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
2169  data += sizeof(HeapTupleData);
2170 
2171  memcpy(data, newtup->tuple.t_data, newlen);
2172  data += newlen;
2173  }
2174  break;
2175  }
2177  {
2178  char *data;
2179  Size prefix_size = strlen(change->data.msg.prefix) + 1;
2180 
2181  sz += prefix_size + change->data.msg.message_size +
2182  sizeof(Size) + sizeof(Size);
2184 
2185  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2186 
2187  /* might have been reallocated above */
2188  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2189 
2190  /* write the prefix including the size */
2191  memcpy(data, &prefix_size, sizeof(Size));
2192  data += sizeof(Size);
2193  memcpy(data, change->data.msg.prefix,
2194  prefix_size);
2195  data += prefix_size;
2196 
2197  /* write the message including the size */
2198  memcpy(data, &change->data.msg.message_size, sizeof(Size));
2199  data += sizeof(Size);
2200  memcpy(data, change->data.msg.message,
2201  change->data.msg.message_size);
2202  data += change->data.msg.message_size;
2203 
2204  break;
2205  }
2207  {
2208  Snapshot snap;
2209  char *data;
2210 
2211  snap = change->data.snapshot;
2212 
2213  sz += sizeof(SnapshotData) +
2214  sizeof(TransactionId) * snap->xcnt +
2215  sizeof(TransactionId) * snap->subxcnt
2216  ;
2217 
2218  /* make sure we have enough space */
2220  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2221  /* might have been reallocated above */
2222  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2223 
2224  memcpy(data, snap, sizeof(SnapshotData));
2225  data += sizeof(SnapshotData);
2226 
2227  if (snap->xcnt)
2228  {
2229  memcpy(data, snap->xip,
2230  sizeof(TransactionId) * snap->xcnt);
2231  data += sizeof(TransactionId) * snap->xcnt;
2232  }
2233 
2234  if (snap->subxcnt)
2235  {
2236  memcpy(data, snap->subxip,
2237  sizeof(TransactionId) * snap->subxcnt);
2238  data += sizeof(TransactionId) * snap->subxcnt;
2239  }
2240  break;
2241  }
2245  /* ReorderBufferChange contains everything important */
2246  break;
2247  }
2248 
2249  ondisk->size = sz;
2250 
2252  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
2253  {
2254  int save_errno = errno;
2255 
2257  errno = save_errno;
2258  ereport(ERROR,
2260  errmsg("could not write to data file for XID %u: %m",
2261  txn->xid)));
2262  }
2264 
2265  Assert(ondisk->change.action == change->action);
2266 }
uint32 TransactionId
Definition: c.h:455
#define write(a, b, c)
Definition: win32.h:14
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
static int fd(const char *x, int i)
Definition: preproc-init.c:105
HeapTupleHeader t_data
Definition: htup.h:67
struct ReorderBufferChange::@102::@103 tp
#define ERROR
Definition: elog.h:43
uint32 t_len
Definition: htup.h:64
int errcode_for_file_access(void)
Definition: elog.c:598
HeapTupleData tuple
Definition: reorderbuffer.h:27
struct SnapshotData SnapshotData
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1259
#define ereport(elevel, rest)
Definition: elog.h:122
TransactionId * xip
Definition: snapshot.h:79
int CloseTransientFile(int fd)
Definition: fd.c:2563
union ReorderBufferChange::@102 data
TransactionId xid
struct ReorderBufferChange::@102::@104 msg
struct ReorderBufferDiskChange ReorderBufferDiskChange
#define Assert(condition)
Definition: c.h:680
size_t Size
Definition: c.h:414
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1235
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
uint32 xcnt
Definition: snapshot.h:80
int errmsg(const char *fmt,...)
Definition: elog.c:797
ReorderBufferChange change
struct HeapTupleData HeapTupleData
TransactionId * subxip
Definition: snapshot.h:91
int32 subxcnt
Definition: snapshot.h:92

◆ ReorderBufferSerializeReserve()

static void ReorderBufferSerializeReserve ( ReorderBuffer rb,
Size  sz 
)
static

Definition at line 1991 of file reorderbuffer.c.

References ReorderBuffer::context, MemoryContextAlloc(), ReorderBuffer::outbuf, ReorderBuffer::outbufsize, and repalloc().

Referenced by ReorderBufferRestoreChanges(), and ReorderBufferSerializeChange().

1992 {
1993  if (!rb->outbufsize)
1994  {
1995  rb->outbuf = MemoryContextAlloc(rb->context, sz);
1996  rb->outbufsize = sz;
1997  }
1998  else if (rb->outbufsize < sz)
1999  {
2000  rb->outbuf = repalloc(rb->outbuf, sz);
2001  rb->outbufsize = sz;
2002  }
2003 }
MemoryContext context
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:949
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:693

◆ ReorderBufferSerializeTXN()

static void ReorderBufferSerializeTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2026 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, CloseTransientFile(), dlist_iter::cur, dlist_mutable_iter::cur, ReplicationSlot::data, DEBUG2, dlist_container, dlist_delete(), dlist_foreach, dlist_foreach_modify, dlist_is_empty(), elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferChange::lsn, MAXPGPATH, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, OpenTransientFile(), PG_BINARY, ReorderBufferReturnChange(), ReorderBufferSerializeChange(), ReorderBufferTXN::serialized, ReorderBufferTXN::subtxns, wal_segment_size, ReorderBufferTXN::xid, XLByteInSeg, XLByteToSeg, and XLogSegNoOffsetToRecPtr.

Referenced by ReorderBufferCheckSerializeTXN(), and ReorderBufferIterTXNInit().

2027 {
2028  dlist_iter subtxn_i;
2029  dlist_mutable_iter change_i;
2030  int fd = -1;
2031  XLogSegNo curOpenSegNo = 0;
2032  Size spilled = 0;
2033  char path[MAXPGPATH];
2034 
2035  elog(DEBUG2, "spill %u changes in XID %u to disk",
2036  (uint32) txn->nentries_mem, txn->xid);
2037 
2038  /* do the same to all child TXs */
2039  dlist_foreach(subtxn_i, &txn->subtxns)
2040  {
2041  ReorderBufferTXN *subtxn;
2042 
2043  subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
2044  ReorderBufferSerializeTXN(rb, subtxn);
2045  }
2046 
2047  /* serialize changestream */
2048  dlist_foreach_modify(change_i, &txn->changes)
2049  {
2050  ReorderBufferChange *change;
2051 
2052  change = dlist_container(ReorderBufferChange, node, change_i.cur);
2053 
2054  /*
2055  * store in segment in which it belongs by start lsn, don't split over
2056  * multiple segments tho
2057  */
2058  if (fd == -1 ||
2059  !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
2060  {
2061  XLogRecPtr recptr;
2062 
2063  if (fd != -1)
2064  CloseTransientFile(fd);
2065 
2066  XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
2067  XLogSegNoOffsetToRecPtr(curOpenSegNo, 0, recptr, wal_segment_size);
2068 
2069  /*
2070  * No need to care about TLIs here, only used during a single run,
2071  * so each LSN only maps to a specific WAL record.
2072  */
2073  sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2075  (uint32) (recptr >> 32), (uint32) recptr);
2076 
2077  /* open segment, create it if necessary */
2078  fd = OpenTransientFile(path,
2079  O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
2080 
2081  if (fd < 0)
2082  ereport(ERROR,
2084  errmsg("could not open file \"%s\": %m",
2085  path)));
2086  }
2087 
2088  ReorderBufferSerializeChange(rb, txn, fd, change);
2089  dlist_delete(&change->node);
2090  ReorderBufferReturnChange(rb, change);
2091 
2092  spilled++;
2093  }
2094 
2095  Assert(spilled == txn->nentries_mem);
2096  Assert(dlist_is_empty(&txn->changes));
2097  txn->nentries_mem = 0;
2098  txn->serialized = true;
2099 
2100  if (fd != -1)
2101  CloseTransientFile(fd);
2102 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
dlist_node * cur
Definition: ilist.h:180
int wal_segment_size
Definition: xlog.c:113
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
ReplicationSlotPersistentData data
Definition: slot.h:120
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1049
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2393
dlist_head changes
#define MAXPGPATH
#define DEBUG2
Definition: elog.h:24
uint64 XLogSegNo
Definition: xlogdefs.h:34
int errcode_for_file_access(void)
Definition: elog.c:598
#define XLogSegNoOffsetToRecPtr(segno, offset, dest, wal_segsz_bytes)
unsigned int uint32
Definition: c.h:306
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define ereport(elevel, rest)
Definition: elog.h:122
int CloseTransientFile(int fd)
Definition: fd.c:2563
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
dlist_node * cur
Definition: ilist.h:161
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:680
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
size_t Size
Definition: c.h:414
dlist_head subtxns
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define NameStr(name)
Definition: c.h:557
#define elog
Definition: elog.h:219
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ ReorderBufferSetBaseSnapshot()

void ReorderBufferSetBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 1828 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildCommitTxn(), and SnapBuildProcessChange().

1830 {
1831  ReorderBufferTXN *txn;
1832  bool is_new;
1833 
1834  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
1835  Assert(txn->base_snapshot == NULL);
1836  Assert(snap != NULL);
1837 
1838  txn->base_snapshot = snap;
1839  txn->base_snapshot_lsn = lsn;
1840 }
Snapshot base_snapshot
XLogRecPtr base_snapshot_lsn
#define Assert(condition)
Definition: c.h:680
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferSetRestartPoint()

void ReorderBufferSetRestartPoint ( ReorderBuffer rb,
XLogRecPtr  ptr 
)

Definition at line 650 of file reorderbuffer.c.

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

651 {
653 }
XLogRecPtr current_restart_decoding_lsn

◆ ReorderBufferToastAppendChunk()

static void ReorderBufferToastAppendChunk ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 2663 of file reorderbuffer.c.

References Assert, ReorderBufferToastEnt::chunk_id, ReorderBufferToastEnt::chunks, ReorderBufferChange::data, DatumGetInt32, DatumGetObjectId, DatumGetPointer, dlist_init(), dlist_push_tail(), elog, ERROR, fastgetattr, HASH_ENTER, hash_search(), IsToastRelation(), ReorderBufferToastEnt::last_chunk_seq, ReorderBufferChange::node, ReorderBufferToastEnt::num_chunks, ReorderBufferToastEnt::reconstructed, RelationGetDescr, ReorderBufferToastInitHash(), ReorderBufferToastEnt::size, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, ReorderBufferTupleBuf::tuple, VARATT_IS_EXTENDED, VARATT_IS_SHORT, VARHDRSZ, VARHDRSZ_SHORT, VARSIZE, and VARSIZE_SHORT.

Referenced by ReorderBufferCommit().

2665 {
2666  ReorderBufferToastEnt *ent;
2667  ReorderBufferTupleBuf *newtup;
2668  bool found;
2669  int32 chunksize;
2670  bool isnull;
2671  Pointer chunk;
2672  TupleDesc desc = RelationGetDescr(relation);
2673  Oid chunk_id;
2674  int32 chunk_seq;
2675 
2676  if (txn->toast_hash == NULL)
2677  ReorderBufferToastInitHash(rb, txn);
2678 
2679  Assert(IsToastRelation(relation));
2680 
2681  newtup = change->data.tp.newtuple;
2682  chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
2683  Assert(!isnull);
2684  chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
2685  Assert(!isnull);
2686 
2687  ent = (ReorderBufferToastEnt *)
2688  hash_search(txn->toast_hash,
2689  (void *) &chunk_id,
2690  HASH_ENTER,
2691  &found);
2692 
2693  if (!found)
2694  {
2695  Assert(ent->chunk_id == chunk_id);
2696  ent->num_chunks = 0;
2697  ent->last_chunk_seq = 0;
2698  ent->size = 0;
2699  ent->reconstructed = NULL;
2700  dlist_init(&ent->chunks);
2701 
2702  if (chunk_seq != 0)
2703  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
2704  chunk_seq, chunk_id);
2705  }
2706  else if (found && chunk_seq != ent->last_chunk_seq + 1)
2707  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
2708  chunk_seq, chunk_id, ent->last_chunk_seq + 1);
2709 
2710  chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
2711  Assert(!isnull);
2712 
2713  /* calculate size so we can allocate the right size at once later */
2714  if (!VARATT_IS_EXTENDED(chunk))
2715  chunksize = VARSIZE(chunk) - VARHDRSZ;
2716  else if (VARATT_IS_SHORT(chunk))
2717  /* could happen due to heap_form_tuple doing its thing */
2718  chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
2719  else
2720  elog(ERROR, "unexpected type of toast chunk");
2721 
2722  ent->size += chunksize;
2723  ent->last_chunk_seq = chunk_seq;
2724  ent->num_chunks++;
2725  dlist_push_tail(&ent->chunks, &change->node);
2726 }
bool IsToastRelation(Relation relation)
Definition: catalog.c:136
#define fastgetattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:724
#define DatumGetInt32(X)
Definition: postgres.h:478
#define RelationGetDescr(relation)
Definition: rel.h:437
#define VARHDRSZ_SHORT
Definition: postgres.h:269
#define VARSIZE(PTR)
Definition: postgres.h:304
#define VARHDRSZ
Definition: c.h:503
#define DatumGetObjectId(X)
Definition: postgres.h:506
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:904
unsigned int Oid
Definition: postgres_ext.h:31
signed int int32
Definition: c.h:294
struct ReorderBufferChange::@102::@103 tp
char * Pointer
Definition: c.h:283
#define ERROR
Definition: elog.h:43
#define VARATT_IS_SHORT(PTR)
Definition: postgres.h:325
struct varlena * reconstructed
HeapTupleData tuple
Definition: reorderbuffer.h:27
#define VARSIZE_SHORT(PTR)
Definition: postgres.h:306
union ReorderBufferChange::@102 data
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
#define Assert(condition)
Definition: c.h:680
#define VARATT_IS_EXTENDED(PTR)
Definition: postgres.h:326
#define DatumGetPointer(X)
Definition: postgres.h:555
#define elog
Definition: elog.h:219
static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)

◆ ReorderBufferToastInitHash()

static void ReorderBufferToastInitHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2642 of file reorderbuffer.c.

References Assert, ReorderBuffer::context, HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, and ReorderBufferTXN::toast_hash.

Referenced by ReorderBufferToastAppendChunk().

2643 {
2644  HASHCTL hash_ctl;
2645 
2646  Assert(txn->toast_hash == NULL);
2647 
2648  memset(&hash_ctl, 0, sizeof(hash_ctl));
2649  hash_ctl.keysize = sizeof(Oid);
2650  hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
2651  hash_ctl.hcxt = rb->context;
2652  txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
2654 }
struct ReorderBufferToastEnt ReorderBufferToastEnt
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
Size entrysize
Definition: hsearch.h:73
unsigned int Oid
Definition: postgres_ext.h:31
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext context
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
Size keysize
Definition: hsearch.h:72
#define Assert(condition)
Definition: c.h:680

◆ ReorderBufferToastReplace()

static void ReorderBufferToastReplace ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 2736 of file reorderbuffer.c.

References Assert, ReorderBufferToastEnt::chunks, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, DatumGetPointer, dlist_container, dlist_foreach, fastgetattr, free, HASH_FIND, hash_search(), heap_deform_tuple(), heap_form_tuple(), INDIRECT_POINTER_SIZE, MaxHeapTupleSize, MemoryContextSwitchTo(), tupleDesc::natts, palloc0(), pfree(), varatt_indirect::pointer, PointerGetDatum, RelationData::rd_rel, ReorderBufferToastEnt::reconstructed, RelationClose(), RelationGetDescr, RelationIdGetRelation(), ReorderBufferTupleBufData, SET_VARSIZE, SET_VARSIZE_COMPRESSED, SET_VARTAG_EXTERNAL, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, ReorderBufferTupleBuf::tuple, TupleDescAttr, varatt_external::va_extsize, varatt_external::va_rawsize, varatt_external::va_valueid, VARATT_EXTERNAL_GET_POINTER, VARATT_EXTERNAL_IS_COMPRESSED, VARATT_IS_EXTERNAL, VARATT_IS_SHORT, VARDATA, VARDATA_EXTERNAL, VARHDRSZ, VARSIZE, and VARTAG_INDIRECT.

Referenced by ReorderBufferCommit().

2738 {
2739  TupleDesc desc;
2740  int natt;
2741  Datum *attrs;
2742  bool *isnull;
2743  bool *free;
2744  HeapTuple tmphtup;
2745  Relation toast_rel;
2746  TupleDesc toast_desc;
2747  MemoryContext oldcontext;
2748  ReorderBufferTupleBuf *newtup;
2749 
2750  /* no toast tuples changed */
2751  if (txn->toast_hash == NULL)
2752  return;
2753 
2754  oldcontext = MemoryContextSwitchTo(rb->context);
2755 
2756  /* we should only have toast tuples in an INSERT or UPDATE */
2757  Assert(change->data.tp.newtuple);
2758 
2759  desc = RelationGetDescr(relation);
2760 
2761  toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
2762  toast_desc = RelationGetDescr(toast_rel);
2763 
2764  /* should we allocate from stack instead? */
2765  attrs = palloc0(sizeof(Datum) * desc->natts);
2766  isnull = palloc0(sizeof(bool) * desc->natts);
2767  free = palloc0(sizeof(bool) * desc->natts);
2768 
2769  newtup = change->data.tp.newtuple;
2770 
2771  heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
2772 
2773  for (natt = 0; natt < desc->natts; natt++)
2774  {
2775  Form_pg_attribute attr = TupleDescAttr(desc, natt);
2776  ReorderBufferToastEnt *ent;
2777  struct varlena *varlena;
2778 
2779  /* va_rawsize is the size of the original datum -- including header */
2780  struct varatt_external toast_pointer;
2781  struct varatt_indirect redirect_pointer;
2782  struct varlena *new_datum = NULL;
2783  struct varlena *reconstructed;
2784  dlist_iter it;
2785  Size data_done = 0;
2786 
2787  /* system columns aren't toasted */
2788  if (attr->attnum < 0)
2789  continue;
2790 
2791  if (attr->attisdropped)
2792  continue;
2793 
2794  /* not a varlena datatype */
2795  if (attr->attlen != -1)
2796  continue;
2797 
2798  /* no data */
2799  if (isnull[natt])
2800  continue;
2801 
2802  /* ok, we know we have a toast datum */
2803  varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
2804 
2805  /* no need to do anything if the tuple isn't external */
2806  if (!VARATT_IS_EXTERNAL(varlena))
2807  continue;
2808 
2809  VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
2810 
2811  /*
2812  * Check whether the toast tuple changed, replace if so.
2813  */
2814  ent = (ReorderBufferToastEnt *)
2815  hash_search(txn->toast_hash,
2816  (void *) &toast_pointer.va_valueid,
2817  HASH_FIND,
2818  NULL);
2819  if (ent == NULL)
2820  continue;
2821 
2822  new_datum =
2823  (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
2824 
2825  free[natt] = true;
2826 
2827  reconstructed = palloc0(toast_pointer.va_rawsize);
2828 
2829  ent->reconstructed = reconstructed;
2830 
2831  /* stitch toast tuple back together from its parts */
2832  dlist_foreach(it, &ent->chunks)
2833  {
2834  bool isnull;
2835  ReorderBufferChange *cchange;
2836  ReorderBufferTupleBuf *ctup;
2837  Pointer chunk;
2838 
2839  cchange = dlist_container(ReorderBufferChange, node, it.cur);
2840  ctup = cchange->data.tp.newtuple;
2841  chunk = DatumGetPointer(
2842  fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
2843 
2844  Assert(!isnull);
2845  Assert(!VARATT_IS_EXTERNAL(chunk));
2846  Assert(!VARATT_IS_SHORT(chunk));
2847 
2848  memcpy(VARDATA(reconstructed) + data_done,
2849  VARDATA(chunk),
2850  VARSIZE(chunk) - VARHDRSZ);
2851  data_done += VARSIZE(chunk) - VARHDRSZ;
2852  }
2853  Assert(data_done == toast_pointer.va_extsize);
2854 
2855  /* make sure its marked as compressed or not */
2856  if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
2857  SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
2858  else
2859  SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
2860 
2861  memset(&redirect_pointer, 0, sizeof(redirect_pointer));
2862  redirect_pointer.pointer = reconstructed;
2863 
2865  memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
2866  sizeof(redirect_pointer));
2867 
2868  attrs[natt] = PointerGetDatum(new_datum);
2869  }
2870 
2871  /*
2872  * Build tuple in separate memory & copy tuple back into the tuplebuf
2873  * passed to the output plugin. We can't directly heap_fill_tuple() into
2874  * the tuplebuf because attrs[] will point back into the current content.
2875  */
2876  tmphtup = heap_form_tuple(desc, attrs, isnull);
2877  Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
2878  Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
2879 
2880  memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
2881  newtup->tuple.t_len = tmphtup->t_len;
2882 
2883  /*
2884  * free resources we won't further need, more persistent stuff will be
2885  * free'd in ReorderBufferToastReset().
2886  */
2887  RelationClose(toast_rel);
2888  pfree(tmphtup);
2889  for (natt = 0; natt < desc->natts; natt++)
2890  {
2891  if (free[natt])
2892  pfree(DatumGetPointer(attrs[natt]));
2893  }
2894  pfree(attrs);
2895  pfree(free);
2896  pfree(isnull);
2897 
2898  MemoryContextSwitchTo(oldcontext);
2899 }
#define VARDATA(PTR)
Definition: postgres.h:303
#define fastgetattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:724
#define RelationGetDescr(relation)
Definition: rel.h:437
#define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)
Definition: tuptoaster.h:111
#define VARSIZE(PTR)
Definition: postgres.h:304
#define PointerGetDatum(X)
Definition: postgres.h:562
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:90
#define VARHDRSZ
Definition: c.h:503
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:695
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:904
Form_pg_class rd_rel
Definition: rel.h:114
#define VARDATA_EXTERNAL(PTR)
Definition: postgres.h:311
int natts
Definition: tupdesc.h:79
HeapTupleHeader t_data
Definition: htup.h:67
#define VARATT_IS_EXTERNAL(PTR)
Definition: postgres.h:314
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:36
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
struct ReorderBufferChange::@102::@103 tp
void pfree(void *pointer)
Definition: mcxt.c:936
char * Pointer
Definition: c.h:283
#define VARATT_IS_SHORT(PTR)
Definition: postgres.h:325
uint32 t_len
Definition: htup.h:64
#define MaxHeapTupleSize
Definition: htup_details.h:566
struct varlena * reconstructed
#define SET_VARTAG_EXTERNAL(PTR, tag)
Definition: postgres.h:332
HeapTupleData tuple
Definition: reorderbuffer.h:27
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:187
void RelationClose(Relation relation)
Definition: relcache.c:2122
#define INDIRECT_POINTER_SIZE
Definition: tuptoaster.h:102
MemoryContext context
void * palloc0(Size size)
Definition: mcxt.c:864
uintptr_t Datum
Definition: postgres.h:372
union ReorderBufferChange::@102 data
dlist_node * cur
Definition: ilist.h:161
#define free(a)
Definition: header.h:65
#define Assert(condition)
Definition: c.h:680
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
Definition: tuptoaster.h:121
size_t Size
Definition: c.h:414
#define DatumGetPointer(X)
Definition: postgres.h:555
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:936
#define SET_VARSIZE_COMPRESSED(PTR, len)
Definition: postgres.h:330
Definition: c.h:497
#define SET_VARSIZE(PTR, len)
Definition: postgres.h:328
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2033

◆ ReorderBufferToastReset()

static void ReorderBufferToastReset ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2905 of file reorderbuffer.c.

References ReorderBufferToastEnt::chunks, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, hash_destroy(), hash_seq_init(), hash_seq_search(), ReorderBufferChange::node, pfree(), ReorderBufferToastEnt::reconstructed, ReorderBufferReturnChange(), and ReorderBufferTXN::toast_hash.

Referenced by ReorderBufferCommit().

2906 {
2907  HASH_SEQ_STATUS hstat;
2908  ReorderBufferToastEnt *ent;
2909 
2910  if (txn->toast_hash == NULL)
2911  return;
2912 
2913  /* sequentially walk over the hash and free everything */
2914  hash_seq_init(&hstat, txn->toast_hash);
2915  while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
2916  {
2917  dlist_mutable_iter it;
2918 
2919  if (ent->reconstructed != NULL)
2920  pfree(ent->reconstructed);
2921 
2922  dlist_foreach_modify(it, &ent->chunks)
2923  {
2924  ReorderBufferChange *change =
2926 
2927  dlist_delete(&change->node);
2928  ReorderBufferReturnChange(rb, change);
2929  }
2930  }
2931 
2932  hash_destroy(txn->toast_hash);
2933  txn->toast_hash = NULL;
2934 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:812
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:936
struct varlena * reconstructed
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1387
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1377

◆ ReorderBufferTXNByXid()

static ReorderBufferTXN * ReorderBufferTXNByXid ( ReorderBuffer rb,
TransactionId  xid,
bool  create,
bool is_new,
XLogRecPtr  lsn,
bool  create_as_top 
)
static

Definition at line 452 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::current_restart_decoding_lsn, dlist_push_tail(), ReorderBufferTXN::first_lsn, HASH_ENTER, HASH_FIND, hash_search(), InvalidXLogRecPtr, ReorderBufferTXN::node, ReorderBufferGetTXN(), ReorderBufferTXN::restart_decoding_lsn, ReorderBuffer::toplevel_by_lsn, TransactionIdIsValid, ReorderBufferTXNByIdEnt::txn, ReorderBufferTXNByIdEnt::xid, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAddInvalidations(), ReorderBufferAddNewTupleCids(), ReorderBufferAssignChild(), ReorderBufferCommit(), ReorderBufferCommitChild(), ReorderBufferForget(), ReorderBufferProcessXid(), ReorderBufferQueueChange(), ReorderBufferQueueMessage(), ReorderBufferSetBaseSnapshot(), ReorderBufferXidHasBaseSnapshot(), ReorderBufferXidHasCatalogChanges(), and ReorderBufferXidSetCatalogChanges().

454 {
455  ReorderBufferTXN *txn;
457  bool found;
458 
460  Assert(!create || lsn != InvalidXLogRecPtr);
461 
462  /*
463  * Check the one-entry lookup cache first
464  */
466  rb->by_txn_last_xid == xid)
467  {
468  txn = rb->by_txn_last_txn;
469 
470  if (txn != NULL)
471  {
472  /* found it, and it's valid */
473  if (is_new)
474  *is_new = false;
475  return txn;
476  }
477 
478  /*
479  * cached as non-existent, and asked not to create? Then nothing else
480  * to do.
481  */
482  if (!create)
483  return NULL;
484  /* otherwise fall through to create it */
485  }
486 
487  /*
488  * If the cache wasn't hit or it yielded an "does-not-exist" and we want
489  * to create an entry.
490  */
491 
492  /* search the lookup table */
493  ent = (ReorderBufferTXNByIdEnt *)
494  hash_search(rb->by_txn,
495  (void *) &xid,
496  create ? HASH_ENTER : HASH_FIND,
497  &found);
498  if (found)
499  txn = ent->txn;
500  else if (create)
501  {
502  /* initialize the new entry, if creation was requested */
503  Assert(ent != NULL);
504 
505  ent->txn = ReorderBufferGetTXN(rb);
506  ent->txn->xid = xid;
507  txn = ent->txn;
508  txn->first_lsn = lsn;
510 
511  if (create_as_top)
512  {
513  dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
514  AssertTXNLsnOrder(rb);
515  }
516  }
517  else
518  txn = NULL; /* not found and not asked to create */
519 
520  /* update cache */
521  rb->by_txn_last_xid = xid;
522  rb->by_txn_last_txn = txn;
523 
524  if (is_new)
525  *is_new = !found;
526 
527  Assert(!create || txn != NULL);
528  return txn;
529 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
static ReorderBufferTXN * ReorderBufferGetTXN(ReorderBuffer *rb)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:904
ReorderBufferTXN * by_txn_last_txn
dlist_head toplevel_by_lsn
TransactionId xid
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:680
ReorderBufferTXN * txn
Definition: reorderbuffer.c:88
XLogRecPtr restart_decoding_lsn
#define TransactionIdIsValid(xid)
Definition: transam.h:41

◆ ReorderBufferXidHasBaseSnapshot()

bool ReorderBufferXidHasBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 1961 of file reorderbuffer.c.

References ReorderBufferTXN::base_snapshot, InvalidXLogRecPtr, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildCommitTxn(), SnapBuildDistributeNewCatalogSnapshot(), and SnapBuildProcessChange().

1962 {
1963  ReorderBufferTXN *txn;
1964 
1965  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1966  false);
1967 
1968  /* transaction isn't known yet, ergo no snapshot */
1969  if (txn == NULL)
1970  return false;
1971 
1972  /*
1973  * TODO: It would be a nice improvement if we would check the toplevel
1974  * transaction in subtransactions, but we'd need to keep track of a bit
1975  * more state.
1976  */
1977  return txn->base_snapshot != NULL;
1978 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferXidHasCatalogChanges()

bool ReorderBufferXidHasCatalogChanges ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 1945 of file reorderbuffer.c.

References ReorderBufferTXN::has_catalog_changes, InvalidXLogRecPtr, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildCommitTxn().

1946 {
1947  ReorderBufferTXN *txn;
1948 
1949  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1950  false);
1951  if (txn == NULL)
1952  return false;
1953 
1954  return txn->has_catalog_changes;
1955 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferXidSetCatalogChanges()

void ReorderBufferXidSetCatalogChanges ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1930 of file reorderbuffer.c.

References ReorderBufferTXN::has_catalog_changes, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit(), DecodeHeapOp(), and SnapBuildProcessNewCid().

1932 {
1933  ReorderBufferTXN *txn;
1934 
1935  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1936 
1937  txn->has_catalog_changes = true;
1938 }
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ResolveCminCmaxDuringDecoding()

bool ResolveCminCmaxDuringDecoding ( HTAB tuplecid_data,
Snapshot  snapshot,
HeapTuple  htup,
Buffer  buffer,
CommandId cmin,
CommandId cmax 
)

Definition at line 3213 of file reorderbuffer.c.

References Assert, BufferGetTag(), BufferIsLocal, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, HASH_FIND, hash_search(), ItemPointerCopy, ItemPointerGetBlockNumber, MAIN_FORKNUM, ReorderBufferTupleCidKey::relnode, HeapTupleData::t_self, HeapTupleData::t_tableOid, ReorderBufferTupleCidKey::tid, and UpdateLogicalMappings().

Referenced by HeapTupleSatisfiesHistoricMVCC().

3217 {
3220  ForkNumber forkno;
3221  BlockNumber blockno;
3222  bool updated_mapping = false;
3223 
3224  /* be careful about padding */
3225  memset(&key, 0, sizeof(key));
3226 
3228 
3229  /*
3230  * get relfilenode from the buffer, no convenient way to access it other
3231  * than that.
3232  */
3233  BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
3234 
3235  /* tuples can only be in the main fork */
3236  Assert(forkno == MAIN_FORKNUM);
3237  Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
3238 
3239  ItemPointerCopy(&htup->t_self,
3240  &key.tid);
3241 
3242 restart:
3243  ent = (ReorderBufferTupleCidEnt *)
3244  hash_search(tuplecid_data,
3245  (void *) &key,
3246  HASH_FIND,
3247  NULL);
3248 
3249  /*
3250  * failed to find a mapping, check whether the table was rewritten and
3251  * apply mapping if so, but only do that once - there can be no new
3252  * mappings while we are in here since we have to hold a lock on the
3253  * relation.
3254  */
3255  if (ent == NULL && !updated_mapping)
3256  {
3257  UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
3258  /* now check but don't update for a mapping again */
3259  updated_mapping = true;
3260  goto restart;
3261  }
3262  else if (ent == NULL)
3263  return false;
3264 
3265  if (cmin)
3266  *cmin = ent->cmin;
3267  if (cmax)
3268  *cmax = ent->cmax;
3269  return true;
3270 }
uint32 BlockNumber
Definition: block.h:31
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:904
ItemPointerData t_self
Definition: htup.h:65
Oid t_tableOid
Definition: htup.h:66
ForkNumber
Definition: relpath.h:24
#define Assert(condition)
Definition: c.h:680
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:215
#define BufferIsLocal(buffer)
Definition: buf.h:37
static void UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
#define ItemPointerGetBlockNumber(pointer)
Definition: itemptr.h:76
void BufferGetTag(Buffer buffer, RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
Definition: bufmgr.c:2626
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:139

◆ StartupReorderBuffer()

void StartupReorderBuffer ( void  )

Definition at line 2576 of file reorderbuffer.c.

References AllocateDir(), dirent::d_name, DEBUG2, ereport, errcode_for_file_access(), errmsg(), FreeDir(), lstat, MAXPGPATH, PANIC, ReadDir(), ReplicationSlotValidateName(), S_ISDIR, and stat.

Referenced by StartupXLOG().

2577 {
2578  DIR *logical_dir;
2579  struct dirent *logical_de;
2580 
2581  DIR *spill_dir;
2582  struct dirent *spill_de;
2583 
2584  logical_dir = AllocateDir("pg_replslot");
2585  while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
2586  {
2587  struct stat statbuf;
2588  char path[MAXPGPATH * 2 + 12];
2589 
2590  if (strcmp(logical_de->d_name, ".") == 0 ||
2591  strcmp(logical_de->d_name, "..") == 0)
2592  continue;
2593 
2594  /* if it cannot be a slot, skip the directory */
2595  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
2596  continue;
2597 
2598  /*
2599  * ok, has to be a surviving logical slot, iterate and delete
2600  * everything starting with xid-*
2601  */
2602  sprintf(path, "pg_replslot/%s", logical_de->d_name);
2603 
2604  /* we're only creating directories here, skip if it's not our's */
2605  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2606  continue;
2607 
2608  spill_dir = AllocateDir(path);
2609  while ((spill_de = ReadDir(spill_dir, path)) != NULL)
2610  {
2611  if (strcmp(spill_de->d_name, ".") == 0 ||
2612  strcmp(spill_de->d_name, "..") == 0)
2613  continue;
2614 
2615  /* only look at names that can be ours */
2616  if (strncmp(spill_de->d_name, "xid", 3) == 0)
2617  {
2618  sprintf(path, "pg_replslot/%s/%s", logical_de->d_name,
2619  spill_de->d_name);
2620 
2621  if (unlink(path) != 0)
2622  ereport(PANIC,
2624  errmsg("could not remove file \"%s\": %m",
2625  path)));
2626  }
2627  }
2628  FreeDir(spill_dir);
2629  }
2630  FreeDir(logical_dir);
2631 }
Definition: dirent.h:9
#define PANIC
Definition: elog.h:53
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:174
Definition: dirent.c:25
#define MAXPGPATH
#define DEBUG2
Definition: elog.h:24
int errcode_for_file_access(void)
Definition: elog.c:598
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2597
#define ereport(elevel, rest)
Definition: elog.h:122
#define stat(a, b)
Definition: win32_port.h:266
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2663
#define S_ISDIR(m)
Definition: win32_port.h:307
#define lstat(path, sb)
Definition: win32_port.h:255
int errmsg(const char *fmt,...)
Definition: elog.c:797
char d_name[MAX_PATH]
Definition: dirent.h:14
int FreeDir(DIR *dir)
Definition: fd.c:2715

◆ TransactionIdInArray()

static bool TransactionIdInArray ( TransactionId  xid,
TransactionId xip,
Size  num 
)
static

Definition at line 3097 of file reorderbuffer.c.

References xidComparator().

Referenced by UpdateLogicalMappings().

3098 {
3099  return bsearch(&xid, xip, num,
3100  sizeof(TransactionId), xidComparator) != NULL;
3101 }
uint32 TransactionId
Definition: c.h:455
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:138

◆ UpdateLogicalMappings()

static void UpdateLogicalMappings ( HTAB tuplecid_data,
Oid  relid,
Snapshot  snapshot 
)
static

Definition at line 3124 of file reorderbuffer.c.

References AllocateDir(), ApplyLogicalMappingFile(), dirent::d_name, DEBUG1, elog, ERROR, file_sort_by_lsn(), RewriteMappingFile::fname, FreeDir(), InvalidOid, IsSharedRelation(), lappend(), lfirst, list_length(), LOGICAL_REWRITE_FORMAT, RewriteMappingFile::lsn, MyDatabaseId, NIL, palloc(), pfree(), qsort, ReadDir(), SnapshotData::subxcnt, SnapshotData::subxip, TransactionIdDidCommit(), and TransactionIdInArray().

Referenced by ResolveCminCmaxDuringDecoding().

3125 {
3126  DIR *mapping_dir;
3127  struct dirent *mapping_de;
3128  List *files = NIL;
3129  ListCell *file;
3130  RewriteMappingFile **files_a;
3131  size_t off;
3132  Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
3133 
3134  mapping_dir = AllocateDir("pg_logical/mappings");
3135  while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
3136  {
3137  Oid f_dboid;
3138  Oid f_relid;
3139  TransactionId f_mapped_xid;
3140  TransactionId f_create_xid;
3141  XLogRecPtr f_lsn;
3142  uint32 f_hi,
3143  f_lo;
3144  RewriteMappingFile *f;
3145 
3146  if (strcmp(mapping_de->d_name, ".") == 0 ||
3147  strcmp(mapping_de->d_name, "..") == 0)
3148  continue;
3149 
3150  /* Ignore files that aren't ours */
3151  if (strncmp(mapping_de->d_name, "map-", 4) != 0)
3152  continue;
3153 
3154  if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
3155  &f_dboid, &f_relid, &f_hi, &f_lo,
3156  &f_mapped_xid, &f_create_xid) != 6)
3157  elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
3158 
3159  f_lsn = ((uint64) f_hi) << 32 | f_lo;
3160 
3161  /* mapping for another database */
3162  if (f_dboid != dboid)
3163  continue;
3164 
3165  /* mapping for another relation */
3166  if (f_relid != relid)
3167  continue;
3168 
3169  /* did the creating transaction abort? */
3170  if (!TransactionIdDidCommit(f_create_xid))
3171  continue;
3172 
3173  /* not for our transaction */
3174  if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
3175  continue;
3176 
3177  /* ok, relevant, queue for apply */
3178  f = palloc(sizeof(RewriteMappingFile));
3179  f->lsn = f_lsn;
3180  strcpy(f->fname, mapping_de->d_name);
3181  files = lappend(files, f);
3182  }
3183  FreeDir(mapping_dir);
3184 
3185  /* build array we can easily sort */
3186  files_a = palloc(list_length(files) * sizeof(RewriteMappingFile *));
3187  off = 0;
3188  foreach(file, files)
3189  {
3190  files_a[off++] = lfirst(file);
3191  }
3192 
3193  /* sort files so we apply them in LSN order */
3194  qsort(files_a, list_length(files), sizeof(RewriteMappingFile *),
3196 
3197  for (off = 0; off < list_length(files); off++)
3198  {
3199  RewriteMappingFile *f = files_a[off];
3200 
3201  elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
3202  snapshot->subxip[0]);
3203  ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
3204  pfree(f);
3205  }
3206 }
#define NIL
Definition: pg_list.h:69
#define DEBUG1
Definition: elog.h:25
uint32 TransactionId
Definition: c.h:455
static bool TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
char fname[MAXPGPATH]
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:125
unsigned int Oid
Definition: postgres_ext.h:31
Definition: dirent.h:9
static void ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
static int file_sort_by_lsn(const void *a_p, const void *b_p)
void pfree(void *pointer)
Definition: mcxt.c:936
Definition: dirent.c:25
#define ERROR
Definition: elog.h:43
unsigned int uint32
Definition: c.h:306
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2597
List * lappend(List *list, void *datum)
Definition: list.c:128
Oid MyDatabaseId
Definition: globals.c:77
bool IsSharedRelation(Oid relationId)
Definition: catalog.c:220
#define InvalidOid
Definition: postgres_ext.h:36
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define lfirst(lc)
Definition: pg_list.h:106
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2663
static int list_length(const List *l)
Def