PostgreSQL Source Code  git master
reorderbuffer.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/detoast.h"
#include "access/heapam.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/combocid.h"
#include "utils/memdebug.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenodemap.h"
Include dependency graph for reorderbuffer.c:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTXNByIdEnt
 
struct  ReorderBufferTupleCidKey
 
struct  ReorderBufferTupleCidEnt
 
struct  TXNEntryFile
 
struct  ReorderBufferIterTXNEntry
 
struct  ReorderBufferIterTXNState
 
struct  ReorderBufferToastEnt
 
struct  ReorderBufferDiskChange
 
struct  RewriteMappingFile
 

Typedefs

typedef struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
 
typedef struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
 
typedef struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
 
typedef struct TXNEntryFile TXNEntryFile
 
typedef struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
 
typedef struct ReorderBufferIterTXNState ReorderBufferIterTXNState
 
typedef struct ReorderBufferToastEnt ReorderBufferToastEnt
 
typedef struct ReorderBufferDiskChange ReorderBufferDiskChange
 
typedef struct RewriteMappingFile RewriteMappingFile
 

Functions

static ReorderBufferTXNReorderBufferGetTXN (ReorderBuffer *rb)
 
static void ReorderBufferReturnTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferTXNReorderBufferTXNByXid (ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
 
static void ReorderBufferTransferSnapToParent (ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
 
static void AssertTXNLsnOrder (ReorderBuffer *rb)
 
static void ReorderBufferIterTXNInit (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
 
static ReorderBufferChangeReorderBufferIterTXNNext (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferIterTXNFinish (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferExecuteInvalidations (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferCheckMemoryLimit (ReorderBuffer *rb)
 
static void ReorderBufferSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeChange (ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
 
static Size ReorderBufferRestoreChanges (ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
 
static void ReorderBufferRestoreChange (ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
 
static void ReorderBufferRestoreCleanup (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferCleanupSerializedTXNs (const char *slotname)
 
static void ReorderBufferSerializedPath (char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
 
static void ReorderBufferFreeSnap (ReorderBuffer *rb, Snapshot snap)
 
static Snapshot ReorderBufferCopySnap (ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
 
static void ReorderBufferToastInitHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReset (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReplace (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void ReorderBufferToastAppendChunk (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static Size ReorderBufferChangeSize (ReorderBufferChange *change)
 
static void ReorderBufferChangeMemoryUpdate (ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
 
ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *rb)
 
void ReorderBufferReturnChange (ReorderBuffer *rb, ReorderBufferChange *change)
 
ReorderBufferTupleBufReorderBufferGetTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
 
OidReorderBufferGetRelids (ReorderBuffer *rb, int nrelids)
 
void ReorderBufferReturnRelids (ReorderBuffer *rb, Oid *relids)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
TransactionId ReorderBufferGetOldestXmin (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
static int ReorderBufferIterCompare (Datum a, Datum b, void *arg)
 
static void ReorderBufferCleanupTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferBuildTupleCidHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
static void ReorderBufferSerializeReserve (ReorderBuffer *rb, Size sz)
 
static ReorderBufferTXNReorderBufferLargestTXN (ReorderBuffer *rb)
 
void StartupReorderBuffer (void)
 
static void ApplyLogicalMappingFile (HTAB *tuplecid_data, Oid relid, const char *fname)
 
static bool TransactionIdInArray (TransactionId xid, TransactionId *xip, Size num)
 
static int file_sort_by_lsn (const ListCell *a_p, const ListCell *b_p)
 
static void UpdateLogicalMappings (HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
 
bool ResolveCminCmaxDuringDecoding (HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
 

Variables

int logical_decoding_work_mem
 
static const Size max_changes_in_memory = 4096
 

Typedef Documentation

◆ ReorderBufferDiskChange

◆ ReorderBufferIterTXNEntry

◆ ReorderBufferIterTXNState

◆ ReorderBufferToastEnt

◆ ReorderBufferTupleCidEnt

◆ ReorderBufferTupleCidKey

◆ ReorderBufferTXNByIdEnt

◆ RewriteMappingFile

◆ TXNEntryFile

typedef struct TXNEntryFile TXNEntryFile

Function Documentation

◆ ApplyLogicalMappingFile()

static void ApplyLogicalMappingFile ( HTAB tuplecid_data,
Oid  relid,
const char *  fname 
)
static

Definition at line 3589 of file reorderbuffer.c.

References Assert, CloseTransientFile(), ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), HASH_ENTER, HASH_FIND, hash_search(), InvalidCommandId, ItemPointerCopy, sort-test::key, MAXPGPATH, LogicalRewriteMappingData::new_node, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_node, LogicalRewriteMappingData::old_tid, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, ReorderBufferTupleCidKey::relnode, sprintf, ReorderBufferTupleCidKey::tid, and WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ.

Referenced by UpdateLogicalMappings().

3590 {
3591  char path[MAXPGPATH];
3592  int fd;
3593  int readBytes;
3595 
3596  sprintf(path, "pg_logical/mappings/%s", fname);
3597  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
3598  if (fd < 0)
3599  ereport(ERROR,
3601  errmsg("could not open file \"%s\": %m", path)));
3602 
3603  while (true)
3604  {
3607  ReorderBufferTupleCidEnt *new_ent;
3608  bool found;
3609 
3610  /* be careful about padding */
3611  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
3612 
3613  /* read all mappings till the end of the file */
3615  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
3617 
3618  if (readBytes < 0)
3619  ereport(ERROR,
3621  errmsg("could not read file \"%s\": %m",
3622  path)));
3623  else if (readBytes == 0) /* EOF */
3624  break;
3625  else if (readBytes != sizeof(LogicalRewriteMappingData))
3626  ereport(ERROR,
3628  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
3629  path, readBytes,
3630  (int32) sizeof(LogicalRewriteMappingData))));
3631 
3632  key.relnode = map.old_node;
3633  ItemPointerCopy(&map.old_tid,
3634  &key.tid);
3635 
3636 
3637  ent = (ReorderBufferTupleCidEnt *)
3638  hash_search(tuplecid_data,
3639  (void *) &key,
3640  HASH_FIND,
3641  NULL);
3642 
3643  /* no existing mapping, no need to update */
3644  if (!ent)
3645  continue;
3646 
3647  key.relnode = map.new_node;
3648  ItemPointerCopy(&map.new_tid,
3649  &key.tid);
3650 
3651  new_ent = (ReorderBufferTupleCidEnt *)
3652  hash_search(tuplecid_data,
3653  (void *) &key,
3654  HASH_ENTER,
3655  &found);
3656 
3657  if (found)
3658  {
3659  /*
3660  * Make sure the existing mapping makes sense. We sometime update
3661  * old records that did not yet have a cmax (e.g. pg_class' own
3662  * entry while rewriting it) during rewrites, so allow that.
3663  */
3664  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
3665  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
3666  }
3667  else
3668  {
3669  /* update mapping */
3670  new_ent->cmin = ent->cmin;
3671  new_ent->cmax = ent->cmax;
3672  new_ent->combocid = ent->combocid;
3673  }
3674  }
3675 
3676  if (CloseTransientFile(fd) != 0)
3677  ereport(ERROR,
3679  errmsg("could not close file \"%s\": %m", path)));
3680 }
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1221
signed int int32
Definition: c.h:347
#define sprintf
Definition: port.h:194
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2292
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:631
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1344
#define ereport(elevel, rest)
Definition: elog.h:141
int CloseTransientFile(int fd)
Definition: fd.c:2469
#define InvalidCommandId
Definition: c.h:531
ItemPointerData new_tid
Definition: rewriteheap.h:40
#define Assert(condition)
Definition: c.h:739
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1320
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define read(a, b, c)
Definition: win32.h:13
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:161
ItemPointerData old_tid
Definition: rewriteheap.h:39

◆ AssertTXNLsnOrder()

static void AssertTXNLsnOrder ( ReorderBuffer rb)
static

Definition at line 719 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferAssignChild(), ReorderBufferGetOldestTXN(), ReorderBufferGetOldestXmin(), ReorderBufferSetBaseSnapshot(), and ReorderBufferTXNByXid().

720 {
721 #ifdef USE_ASSERT_CHECKING
722  dlist_iter iter;
723  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
724  XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
725 
726  dlist_foreach(iter, &rb->toplevel_by_lsn)
727  {
729  iter.cur);
730 
731  /* start LSN must be set */
732  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
733 
734  /* If there is an end LSN, it must be higher than start LSN */
735  if (cur_txn->end_lsn != InvalidXLogRecPtr)
736  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
737 
738  /* Current initial LSN must be strictly higher than previous */
739  if (prev_first_lsn != InvalidXLogRecPtr)
740  Assert(prev_first_lsn < cur_txn->first_lsn);
741 
742  /* known-as-subtxn txns must not be listed */
743  Assert(!rbtxn_is_known_subxact(cur_txn));
744 
745  prev_first_lsn = cur_txn->first_lsn;
746  }
747 
749  {
751  base_snapshot_node,
752  iter.cur);
753 
754  /* base snapshot (and its LSN) must be set */
755  Assert(cur_txn->base_snapshot != NULL);
757 
758  /* current LSN must be strictly higher than previous */
759  if (prev_base_snap_lsn != InvalidXLogRecPtr)
760  Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
761 
762  /* known-as-subtxn txns must not be listed */
763  Assert(!rbtxn_is_known_subxact(cur_txn));
764 
765  prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
766  }
767 #endif
768 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
XLogRecPtr base_snapshot_lsn
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head txns_by_base_snapshot_lsn
dlist_head toplevel_by_lsn
dlist_node * cur
Definition: ilist.h:161
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:739
XLogRecPtr end_lsn
#define rbtxn_is_known_subxact(txn)

◆ file_sort_by_lsn()

static int file_sort_by_lsn ( const ListCell a_p,
const ListCell b_p 
)
static

Definition at line 3697 of file reorderbuffer.c.

References lfirst, and RewriteMappingFile::lsn.

Referenced by UpdateLogicalMappings().

3698 {
3701 
3702  if (a->lsn < b->lsn)
3703  return -1;
3704  else if (a->lsn > b->lsn)
3705  return 1;
3706  return 0;
3707 }
#define lfirst(lc)
Definition: pg_list.h:190

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1932 of file reorderbuffer.c.

References ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferCleanupTXN(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeAbort().

1933 {
1934  ReorderBufferTXN *txn;
1935 
1936  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1937  false);
1938 
1939  /* unknown, nothing to remove */
1940  if (txn == NULL)
1941  return;
1942 
1943  /* cosmetic... */
1944  txn->final_lsn = lsn;
1945 
1946  /* remove potential on-disk data, and deallocate */
1947  ReorderBufferCleanupTXN(rb, txn);
1948 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)

Definition at line 1958 of file reorderbuffer.c.

References ReorderBufferTXN::changes, dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, dlist_tail_element, elog, ReorderBufferTXN::final_lsn, ReorderBufferChange::lsn, rbtxn_is_serialized, ReorderBufferCleanupTXN(), ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::xid.

Referenced by DecodeStandbyOp().

1959 {
1960  dlist_mutable_iter it;
1961 
1962  /*
1963  * Iterate through all (potential) toplevel TXNs and abort all that are
1964  * older than what possibly can be running. Once we've found the first
1965  * that is alive we stop, there might be some that acquired an xid earlier
1966  * but started writing later, but it's unlikely and they will be cleaned
1967  * up in a later call to this function.
1968  */
1970  {
1971  ReorderBufferTXN *txn;
1972 
1973  txn = dlist_container(ReorderBufferTXN, node, it.cur);
1974 
1975  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1976  {
1977  /*
1978  * We set final_lsn on a transaction when we decode its commit or
1979  * abort record, but we never see those records for crashed
1980  * transactions. To ensure cleanup of these transactions, set
1981  * final_lsn to that of their last change; this causes
1982  * ReorderBufferRestoreCleanup to do the right thing.
1983  */
1984  if (rbtxn_is_serialized(txn) && txn->final_lsn == 0)
1985  {
1986  ReorderBufferChange *last =
1988 
1989  txn->final_lsn = last->lsn;
1990  }
1991 
1992  elog(DEBUG2, "aborting old transaction %u", txn->xid);
1993 
1994  /* remove potential on-disk data, and deallocate this tx */
1995  ReorderBufferCleanupTXN(rb, txn);
1996  }
1997  else
1998  return;
1999  }
2000 }
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:496
dlist_head changes
#define DEBUG2
Definition: elog.h:24
XLogRecPtr final_lsn
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
dlist_head toplevel_by_lsn
TransactionId xid
#define elog(elevel,...)
Definition: elog.h:228
#define rbtxn_is_serialized(txn)

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 2234 of file reorderbuffer.c.

References Assert, ReorderBuffer::context, elog, ERROR, ReorderBufferTXN::invalidations, MemoryContextAlloc(), ReorderBufferTXN::ninvalidations, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

2237 {
2238  ReorderBufferTXN *txn;
2239 
2240  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2241 
2242  if (txn->ninvalidations != 0)
2243  elog(ERROR, "only ever add one set of invalidations");
2244 
2245  Assert(nmsgs > 0);
2246 
2247  txn->ninvalidations = nmsgs;
2250  sizeof(SharedInvalidationMessage) * nmsgs);
2251  memcpy(txn->invalidations, msgs,
2252  sizeof(SharedInvalidationMessage) * nmsgs);
2253 }
#define ERROR
Definition: elog.h:43
MemoryContext context
#define Assert(condition)
Definition: c.h:739
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
#define elog(elevel,...)
Definition: elog.h:228

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

Definition at line 2150 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::command_id, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, ReorderBufferGetChange(), and ReorderBufferQueueChange().

Referenced by SnapBuildProcessNewCid().

2152 {
2154 
2155  change->data.command_id = cid;
2157 
2158  ReorderBufferQueueChange(rb, xid, lsn, change);
2159 }
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
union ReorderBufferChange::@101 data
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileNode  node,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 2205 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessNewCid().

2209 {
2211  ReorderBufferTXN *txn;
2212 
2213  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2214 
2215  change->data.tuplecid.node = node;
2216  change->data.tuplecid.tid = tid;
2217  change->data.tuplecid.cmin = cmin;
2218  change->data.tuplecid.cmax = cmax;
2219  change->data.tuplecid.combocid = combocid;
2220  change->lsn = lsn;
2221  change->txn = txn;
2223 
2224  dlist_push_tail(&txn->tuplecids, &change->node);
2225  txn->ntuplecids++;
2226 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:86
struct ReorderBufferChange::@101::@105 tuplecid
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
union ReorderBufferChange::@101 data
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
dlist_head tuplecids

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 2101 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, ReorderBufferGetChange(), ReorderBufferQueueChange(), and ReorderBufferChange::snapshot.

Referenced by SnapBuildDistributeNewCatalogSnapshot().

2103 {
2105 
2106  change->data.snapshot = snap;
2108 
2109  ReorderBufferQueueChange(rb, xid, lsn, change);
2110 }
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
union ReorderBufferChange::@101 data
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)

◆ ReorderBufferAllocate()

ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 272 of file reorderbuffer.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, ReorderBufferCleanupSerializedTXNs(), ReorderBuffer::size, SLAB_DEFAULT_BLOCK_SIZE, SLAB_LARGE_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBuffer::toplevel_by_lsn, ReorderBuffer::tup_context, ReorderBuffer::txn_context, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

273 {
274  ReorderBuffer *buffer;
275  HASHCTL hash_ctl;
276  MemoryContext new_ctx;
277 
278  Assert(MyReplicationSlot != NULL);
279 
280  /* allocate memory in own context, to have better accountability */
282  "ReorderBuffer",
284 
285  buffer =
286  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
287 
288  memset(&hash_ctl, 0, sizeof(hash_ctl));
289 
290  buffer->context = new_ctx;
291 
292  buffer->change_context = SlabContextCreate(new_ctx,
293  "Change",
295  sizeof(ReorderBufferChange));
296 
297  buffer->txn_context = SlabContextCreate(new_ctx,
298  "TXN",
300  sizeof(ReorderBufferTXN));
301 
302  buffer->tup_context = GenerationContextCreate(new_ctx,
303  "Tuples",
305 
306  hash_ctl.keysize = sizeof(TransactionId);
307  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
308  hash_ctl.hcxt = buffer->context;
309 
310  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
312 
314  buffer->by_txn_last_txn = NULL;
315 
316  buffer->outbuf = NULL;
317  buffer->outbufsize = 0;
318  buffer->size = 0;
319 
320  buffer->spillCount = 0;
321  buffer->spillTxns = 0;
322  buffer->spillBytes = 0;
323 
325 
326  dlist_init(&buffer->toplevel_by_lsn);
328 
329  /*
330  * Ensure there's no stale data from prior uses of this slot, in case some
331  * prior exit avoided calling ReorderBufferFree. Failure to do this can
332  * produce duplicated txns, and it's very cheap if there's nothing there.
333  */
335 
336  return buffer;
337 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define AllocSetContextCreate
Definition: memutils.h:170
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
uint32 TransactionId
Definition: c.h:514
MemoryContext hcxt
Definition: hsearch.h:78
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
Size entrysize
Definition: hsearch.h:73
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:190
ReplicationSlotPersistentData data
Definition: slot.h:132
MemoryContext change_context
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:222
dlist_head txns_by_base_snapshot_lsn
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
#define InvalidTransactionId
Definition: transam.h:31
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size blockSize)
Definition: generation.c:212
MemoryContext context
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
ReorderBufferTXN * by_txn_last_txn
dlist_head toplevel_by_lsn
Size keysize
Definition: hsearch.h:72
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:739
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:221
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
#define NameStr(name)
Definition: c.h:616
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
MemoryContext tup_context
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext txn_context

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 830 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), elog, ERROR, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SUBXACT, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, ReorderBufferTXN::toplevel_xid, ReorderBufferTXNByIdEnt::txn, ReorderBufferTXN::txn_flags, and ReorderBufferTXNByIdEnt::xid.

Referenced by DecodeXactOp(), and ReorderBufferCommitChild().

832 {
833  ReorderBufferTXN *txn;
834  ReorderBufferTXN *subtxn;
835  bool new_top;
836  bool new_sub;
837 
838  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
839  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
840 
841  if (new_top && !new_sub)
842  elog(ERROR, "subtransaction logged without previous top-level txn record");
843 
844  if (!new_sub)
845  {
846  if (rbtxn_is_known_subxact(subtxn))
847  {
848  /* already associated, nothing to do */
849  return;
850  }
851  else
852  {
853  /*
854  * We already saw this transaction, but initially added it to the
855  * list of top-level txns. Now that we know it's not top-level,
856  * remove it from there.
857  */
858  dlist_delete(&subtxn->node);
859  }
860  }
861 
862  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
863  subtxn->toplevel_xid = xid;
864  Assert(subtxn->nsubtxns == 0);
865 
866  /* add to subtransaction list */
867  dlist_push_tail(&txn->subtxns, &subtxn->node);
868  txn->nsubtxns++;
869 
870  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
872 
873  /* Verify LSN-ordering invariant */
874  AssertTXNLsnOrder(rb);
875 }
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
#define ERROR
Definition: elog.h:43
#define RBTXN_IS_SUBXACT
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:739
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define elog(elevel,...)
Definition: elog.h:228
#define rbtxn_is_known_subxact(txn)
TransactionId toplevel_xid

◆ ReorderBufferBuildTupleCidHash()

static void ReorderBufferBuildTupleCidHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1351 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, dlist_container, dlist_foreach, dlist_is_empty(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FIND, hash_search(), HASHCTL::hcxt, InvalidCommandId, ItemPointerCopy, sort-test::key, HASHCTL::keysize, ReorderBufferTXN::ntuplecids, rbtxn_has_catalog_changes, ReorderBufferTupleCidKey::relnode, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferTupleCidKey::tid, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::tuplecids.

Referenced by ReorderBufferCommit().

1352 {
1353  dlist_iter iter;
1354  HASHCTL hash_ctl;
1355 
1357  return;
1358 
1359  memset(&hash_ctl, 0, sizeof(hash_ctl));
1360 
1361  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1362  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1363  hash_ctl.hcxt = rb->context;
1364 
1365  /*
1366  * create the hash with the exact number of to-be-stored tuplecids from
1367  * the start
1368  */
1369  txn->tuplecid_hash =
1370  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1372 
1373  dlist_foreach(iter, &txn->tuplecids)
1374  {
1377  bool found;
1378  ReorderBufferChange *change;
1379 
1380  change = dlist_container(ReorderBufferChange, node, iter.cur);
1381 
1383 
1384  /* be careful about padding */
1385  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1386 
1387  key.relnode = change->data.tuplecid.node;
1388 
1389  ItemPointerCopy(&change->data.tuplecid.tid,
1390  &key.tid);
1391 
1392  ent = (ReorderBufferTupleCidEnt *)
1394  (void *) &key,
1396  &found);
1397  if (!found)
1398  {
1399  ent->cmin = change->data.tuplecid.cmin;
1400  ent->cmax = change->data.tuplecid.cmax;
1401  ent->combocid = change->data.tuplecid.combocid;
1402  }
1403  else
1404  {
1405  /*
1406  * Maybe we already saw this tuple before in this transaction, but
1407  * if so it must have the same cmin.
1408  */
1409  Assert(ent->cmin == change->data.tuplecid.cmin);
1410 
1411  /*
1412  * cmax may be initially invalid, but once set it can only grow,
1413  * and never become invalid again.
1414  */
1415  Assert((ent->cmax == InvalidCommandId) ||
1416  ((change->data.tuplecid.cmax != InvalidCommandId) &&
1417  (change->data.tuplecid.cmax > ent->cmax)));
1418  ent->cmax = change->data.tuplecid.cmax;
1419  }
1420  }
1421 }
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
Size entrysize
Definition: hsearch.h:73
struct ReorderBufferChange::@101::@105 tuplecid
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext context
#define InvalidCommandId
Definition: c.h:531
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
Size keysize
Definition: hsearch.h:72
dlist_node * cur
Definition: ilist.h:161
#define Assert(condition)
Definition: c.h:739
union ReorderBufferChange::@101 data
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define rbtxn_has_catalog_changes(txn)
dlist_head tuplecids
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:161

◆ ReorderBufferChangeMemoryUpdate()

static void ReorderBufferChangeMemoryUpdate ( ReorderBuffer rb,
ReorderBufferChange change,
bool  addition 
)
static

Definition at line 2166 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferChangeSize(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBufferChange::txn.

Referenced by ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferReturnChange(), and ReorderBufferToastReplace().

2169 {
2170  Size sz;
2171 
2172  Assert(change->txn);
2173 
2174  /*
2175  * Ignore tuple CID changes, because those are not evicted when reaching
2176  * memory limit. So we just don't count them, because it might easily
2177  * trigger a pointless attempt to spill.
2178  */
2180  return;
2181 
2182  sz = ReorderBufferChangeSize(change);
2183 
2184  if (addition)
2185  {
2186  change->txn->size += sz;
2187  rb->size += sz;
2188  }
2189  else
2190  {
2191  Assert((rb->size >= sz) && (change->txn->size >= sz));
2192  change->txn->size -= sz;
2193  rb->size -= sz;
2194  }
2195 }
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:86
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
#define Assert(condition)
Definition: c.h:739
size_t Size
Definition: c.h:467

◆ ReorderBufferChangeSize()

static Size ReorderBufferChangeSize ( ReorderBufferChange change)
static

Definition at line 2707 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::msg, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::snapshot, SnapshotData::subxcnt, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTupleBuf::tuple, and SnapshotData::xcnt.

Referenced by ReorderBufferChangeMemoryUpdate().

2708 {
2709  Size sz = sizeof(ReorderBufferChange);
2710 
2711  switch (change->action)
2712  {
2713  /* fall through these, they're all similar enough */
2718  {
2719  ReorderBufferTupleBuf *oldtup,
2720  *newtup;
2721  Size oldlen = 0;
2722  Size newlen = 0;
2723 
2724  oldtup = change->data.tp.oldtuple;
2725  newtup = change->data.tp.newtuple;
2726 
2727  if (oldtup)
2728  {
2729  sz += sizeof(HeapTupleData);
2730  oldlen = oldtup->tuple.t_len;
2731  sz += oldlen;
2732  }
2733 
2734  if (newtup)
2735  {
2736  sz += sizeof(HeapTupleData);
2737  newlen = newtup->tuple.t_len;
2738  sz += newlen;
2739  }
2740 
2741  break;
2742  }
2744  {
2745  Size prefix_size = strlen(change->data.msg.prefix) + 1;
2746 
2747  sz += prefix_size + change->data.msg.message_size +
2748  sizeof(Size) + sizeof(Size);
2749 
2750  break;
2751  }
2753  {
2754  Snapshot snap;
2755 
2756  snap = change->data.snapshot;
2757 
2758  sz += sizeof(SnapshotData) +
2759  sizeof(TransactionId) * snap->xcnt +
2760  sizeof(TransactionId) * snap->subxcnt;
2761 
2762  break;
2763  }
2765  {
2766  sz += sizeof(Oid) * change->data.truncate.nrelids;
2767 
2768  break;
2769  }
2773  /* ReorderBufferChange contains everything important */
2774  break;
2775  }
2776 
2777  return sz;
2778 }
uint32 TransactionId
Definition: c.h:514
struct ReorderBufferChange::@101::@102 tp
struct ReorderBufferChange::@101::@104 msg
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
uint32 t_len
Definition: htup.h:64
HeapTupleData tuple
Definition: reorderbuffer.h:29
struct SnapshotData SnapshotData
union ReorderBufferChange::@101 data
size_t Size
Definition: c.h:467
uint32 xcnt
Definition: snapshot.h:169
struct HeapTupleData HeapTupleData
struct ReorderBufferChange::@101::@103 truncate
struct ReorderBufferChange ReorderBufferChange
int32 subxcnt
Definition: snapshot.h:181

◆ ReorderBufferCheckMemoryLimit()

static void ReorderBufferCheckMemoryLimit ( ReorderBuffer rb)
static

Definition at line 2393 of file reorderbuffer.c.

References Assert, logical_decoding_work_mem, ReorderBufferTXN::nentries_mem, ReorderBufferLargestTXN(), ReorderBufferSerializeTXN(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferQueueChange().

2394 {
2395  ReorderBufferTXN *txn;
2396 
2397  /* bail out if we haven't exceeded the memory limit */
2398  if (rb->size < logical_decoding_work_mem * 1024L)
2399  return;
2400 
2401  /*
2402  * Pick the largest transaction (or subtransaction) and evict it from
2403  * memory by serializing it to disk.
2404  */
2405  txn = ReorderBufferLargestTXN(rb);
2406 
2407  ReorderBufferSerializeTXN(rb, txn);
2408 
2409  /*
2410  * After eviction, the transaction should have no entries in memory, and
2411  * should use 0 bytes for changes.
2412  */
2413  Assert(txn->size == 0);
2414  Assert(txn->nentries_mem == 0);
2415 
2416  /*
2417  * And furthermore, evicting the transaction should get us below the
2418  * memory limit again - it is not possible that we're still exceeding the
2419  * memory limit after evicting the transaction.
2420  *
2421  * This follows from the simple fact that the selected transaction is at
2422  * least as large as the most recent change (which caused us to go over
2423  * the memory limit). So by evicting it we're definitely back below the
2424  * memory limit.
2425  */
2426  Assert(rb->size < logical_decoding_work_mem * 1024L);
2427 }
static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:739
int logical_decoding_work_mem

◆ ReorderBufferCleanupSerializedTXNs()

static void ReorderBufferCleanupSerializedTXNs ( const char *  slotname)
static

Definition at line 3109 of file reorderbuffer.c.

References AllocateDir(), dirent::d_name, ereport, errcode_for_file_access(), errmsg(), ERROR, FreeDir(), INFO, lstat, MAXPGPATH, ReadDirExtended(), S_ISDIR, snprintf, sprintf, and stat.

Referenced by ReorderBufferAllocate(), ReorderBufferFree(), and StartupReorderBuffer().

3110 {
3111  DIR *spill_dir;
3112  struct dirent *spill_de;
3113  struct stat statbuf;
3114  char path[MAXPGPATH * 2 + 12];
3115 
3116  sprintf(path, "pg_replslot/%s", slotname);
3117 
3118  /* we're only handling directories here, skip if it's not ours */
3119  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
3120  return;
3121 
3122  spill_dir = AllocateDir(path);
3123  while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
3124  {
3125  /* only look at names that can be ours */
3126  if (strncmp(spill_de->d_name, "xid", 3) == 0)
3127  {
3128  snprintf(path, sizeof(path),
3129  "pg_replslot/%s/%s", slotname,
3130  spill_de->d_name);
3131 
3132  if (unlink(path) != 0)
3133  ereport(ERROR,
3135  errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
3136  path, slotname)));
3137  }
3138  }
3139  FreeDir(spill_dir);
3140 }
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2584
#define INFO
Definition: elog.h:33
Definition: dirent.h:9
#define sprintf
Definition: port.h:194
Definition: dirent.c:25
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:631
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2503
#define ereport(elevel, rest)
Definition: elog.h:141
#define stat(a, b)
Definition: win32_port.h:255
#define S_ISDIR(m)
Definition: win32_port.h:296
#define lstat(path, sb)
Definition: win32_port.h:244
int errmsg(const char *fmt,...)
Definition: elog.c:822
char d_name[MAX_PATH]
Definition: dirent.h:14
#define snprintf
Definition: port.h:192
int FreeDir(DIR *dir)
Definition: fd.c:2621

◆ ReorderBufferCleanupTXN()

static void ReorderBufferCleanupTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1259 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_node, ReorderBuffer::by_txn, ReorderBufferTXN::changes, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, HASH_REMOVE, hash_search(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, rbtxn_is_serialized, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferRestoreCleanup(), ReorderBufferReturnChange(), ReorderBufferReturnTXN(), SnapBuildSnapDecRefcount(), ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAbortOld(), ReorderBufferCommit(), and ReorderBufferForget().

1260 {
1261  bool found;
1262  dlist_mutable_iter iter;
1263 
1264  /* cleanup subtransactions & their changes */
1265  dlist_foreach_modify(iter, &txn->subtxns)
1266  {
1267  ReorderBufferTXN *subtxn;
1268 
1269  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1270 
1271  /*
1272  * Subtransactions are always associated to the toplevel TXN, even if
1273  * they originally were happening inside another subtxn, so we won't
1274  * ever recurse more than one level deep here.
1275  */
1276  Assert(rbtxn_is_known_subxact(subtxn));
1277  Assert(subtxn->nsubtxns == 0);
1278 
1279  ReorderBufferCleanupTXN(rb, subtxn);
1280  }
1281 
1282  /* cleanup changes in the toplevel txn */
1283  dlist_foreach_modify(iter, &txn->changes)
1284  {
1285  ReorderBufferChange *change;
1286 
1287  change = dlist_container(ReorderBufferChange, node, iter.cur);
1288 
1289  /* Check we're not mixing changes from different transactions. */
1290  Assert(change->txn == txn);
1291 
1292  ReorderBufferReturnChange(rb, change);
1293  }
1294 
1295  /*
1296  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1297  * They are always stored in the toplevel transaction.
1298  */
1299  dlist_foreach_modify(iter, &txn->tuplecids)
1300  {
1301  ReorderBufferChange *change;
1302 
1303  change = dlist_container(ReorderBufferChange, node, iter.cur);
1304 
1305  /* Check we're not mixing changes from different transactions. */
1306  Assert(change->txn == txn);
1308 
1309  ReorderBufferReturnChange(rb, change);
1310  }
1311 
1312  /*
1313  * Cleanup the base snapshot, if set.
1314  */
1315  if (txn->base_snapshot != NULL)
1316  {
1319  }
1320 
1321  /*
1322  * Remove TXN from its containing list.
1323  *
1324  * Note: if txn is known as subxact, we are deleting the TXN from its
1325  * parent's list of known subxacts; this leaves the parent's nsubxacts
1326  * count too high, but we don't care. Otherwise, we are deleting the TXN
1327  * from the LSN-ordered list of toplevel TXNs.
1328  */
1329  dlist_delete(&txn->node);
1330 
1331  /* now remove reference from buffer */
1332  hash_search(rb->by_txn,
1333  (void *) &txn->xid,
1334  HASH_REMOVE,
1335  &found);
1336  Assert(found);
1337 
1338  /* remove entries spilled to disk */
1339  if (rbtxn_is_serialized(txn))
1340  ReorderBufferRestoreCleanup(rb, txn);
1341 
1342  /* deallocate */
1343  ReorderBufferReturnTXN(rb, txn);
1344 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
Snapshot base_snapshot
dlist_node base_snapshot_node
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:86
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
#define Assert(condition)
Definition: c.h:739
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:428
dlist_head subtxns
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define rbtxn_is_serialized(txn)
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define rbtxn_is_known_subxact(txn)
dlist_head tuplecids

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 1510 of file reorderbuffer.c.

References AbortCurrentTransaction(), ReorderBufferChange::action, ReorderBuffer::apply_change, ReorderBuffer::apply_truncate, Assert, ReorderBufferTXN::base_snapshot, ReorderBuffer::begin, BeginInternalSubTransaction(), ReorderBufferChange::command_id, ReorderBuffer::commit, ReorderBufferTXN::commit_time, SnapshotData::copied, SnapshotData::curcid, ReorderBufferChange::data, dlist_delete(), elog, ReorderBufferTXN::end_lsn, ERROR, ReorderBufferTXN::final_lsn, FirstCommandId, GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), i, InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, ReorderBuffer::message, ReorderBufferChange::msg, ReorderBufferTXN::ninvalidations, ReorderBufferChange::node, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBuffer::output_rewrites, palloc0(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelationIsValid, RelidByRelfilenode(), relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferReturnChange(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTXNByXid(), RollbackAndReleaseCurrentSubTransaction(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, StartTransactionCommand(), TeardownHistoricSnapshot(), ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

1514 {
1515  ReorderBufferTXN *txn;
1516  volatile Snapshot snapshot_now;
1517  volatile CommandId command_id = FirstCommandId;
1518  bool using_subtxn;
1519  ReorderBufferIterTXNState *volatile iterstate = NULL;
1520 
1521  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1522  false);
1523 
1524  /* unknown transaction, nothing to replay */
1525  if (txn == NULL)
1526  return;
1527 
1528  txn->final_lsn = commit_lsn;
1529  txn->end_lsn = end_lsn;
1530  txn->commit_time = commit_time;
1531  txn->origin_id = origin_id;
1532  txn->origin_lsn = origin_lsn;
1533 
1534  /*
1535  * If this transaction has no snapshot, it didn't make any changes to the
1536  * database, so there's nothing to decode. Note that
1537  * ReorderBufferCommitChild will have transferred any snapshots from
1538  * subtransactions if there were any.
1539  */
1540  if (txn->base_snapshot == NULL)
1541  {
1542  Assert(txn->ninvalidations == 0);
1543  ReorderBufferCleanupTXN(rb, txn);
1544  return;
1545  }
1546 
1547  snapshot_now = txn->base_snapshot;
1548 
1549  /* build data to be able to lookup the CommandIds of catalog tuples */
1551 
1552  /* setup the initial snapshot */
1553  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1554 
1555  /*
1556  * Decoding needs access to syscaches et al., which in turn use
1557  * heavyweight locks and such. Thus we need to have enough state around to
1558  * keep track of those. The easiest way is to simply use a transaction
1559  * internally. That also allows us to easily enforce that nothing writes
1560  * to the database by checking for xid assignments.
1561  *
1562  * When we're called via the SQL SRF there's already a transaction
1563  * started, so start an explicit subtransaction there.
1564  */
1565  using_subtxn = IsTransactionOrTransactionBlock();
1566 
1567  PG_TRY();
1568  {
1569  ReorderBufferChange *change;
1570  ReorderBufferChange *specinsert = NULL;
1571 
1572  if (using_subtxn)
1573  BeginInternalSubTransaction("replay");
1574  else
1576 
1577  rb->begin(rb, txn);
1578 
1579  ReorderBufferIterTXNInit(rb, txn, &iterstate);
1580  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1581  {
1582  Relation relation = NULL;
1583  Oid reloid;
1584 
1585  switch (change->action)
1586  {
1588 
1589  /*
1590  * Confirmation for speculative insertion arrived. Simply
1591  * use as a normal record. It'll be cleaned up at the end
1592  * of INSERT processing.
1593  */
1594  if (specinsert == NULL)
1595  elog(ERROR, "invalid ordering of speculative insertion changes");
1596  Assert(specinsert->data.tp.oldtuple == NULL);
1597  change = specinsert;
1599 
1600  /* intentionally fall through */
1604  Assert(snapshot_now);
1605 
1606  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1607  change->data.tp.relnode.relNode);
1608 
1609  /*
1610  * Mapped catalog tuple without data, emitted while
1611  * catalog table was in the process of being rewritten. We
1612  * can fail to look up the relfilenode, because the
1613  * relmapper has no "historic" view, in contrast to normal
1614  * the normal catalog during decoding. Thus repeated
1615  * rewrites can cause a lookup failure. That's OK because
1616  * we do not decode catalog changes anyway. Normally such
1617  * tuples would be skipped over below, but we can't
1618  * identify whether the table should be logically logged
1619  * without mapping the relfilenode to the oid.
1620  */
1621  if (reloid == InvalidOid &&
1622  change->data.tp.newtuple == NULL &&
1623  change->data.tp.oldtuple == NULL)
1624  goto change_done;
1625  else if (reloid == InvalidOid)
1626  elog(ERROR, "could not map filenode \"%s\" to relation OID",
1627  relpathperm(change->data.tp.relnode,
1628  MAIN_FORKNUM));
1629 
1630  relation = RelationIdGetRelation(reloid);
1631 
1632  if (!RelationIsValid(relation))
1633  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1634  reloid,
1635  relpathperm(change->data.tp.relnode,
1636  MAIN_FORKNUM));
1637 
1638  if (!RelationIsLogicallyLogged(relation))
1639  goto change_done;
1640 
1641  /*
1642  * Ignore temporary heaps created during DDL unless the
1643  * plugin has asked for them.
1644  */
1645  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
1646  goto change_done;
1647 
1648  /*
1649  * For now ignore sequence changes entirely. Most of the
1650  * time they don't log changes using records we
1651  * understand, so it doesn't make sense to handle the few
1652  * cases we do.
1653  */
1654  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1655  goto change_done;
1656 
1657  /* user-triggered change */
1658  if (!IsToastRelation(relation))
1659  {
1660  ReorderBufferToastReplace(rb, txn, relation, change);
1661  rb->apply_change(rb, txn, relation, change);
1662 
1663  /*
1664  * Only clear reassembled toast chunks if we're sure
1665  * they're not required anymore. The creator of the
1666  * tuple tells us.
1667  */
1668  if (change->data.tp.clear_toast_afterwards)
1669  ReorderBufferToastReset(rb, txn);
1670  }
1671  /* we're not interested in toast deletions */
1672  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1673  {
1674  /*
1675  * Need to reassemble the full toasted Datum in
1676  * memory, to ensure the chunks don't get reused till
1677  * we're done remove it from the list of this
1678  * transaction's changes. Otherwise it will get
1679  * freed/reused while restoring spooled data from
1680  * disk.
1681  */
1682  Assert(change->data.tp.newtuple != NULL);
1683 
1684  dlist_delete(&change->node);
1685  ReorderBufferToastAppendChunk(rb, txn, relation,
1686  change);
1687  }
1688 
1689  change_done:
1690 
1691  /*
1692  * Either speculative insertion was confirmed, or it was
1693  * unsuccessful and the record isn't needed anymore.
1694  */
1695  if (specinsert != NULL)
1696  {
1697  ReorderBufferReturnChange(rb, specinsert);
1698  specinsert = NULL;
1699  }
1700 
1701  if (relation != NULL)
1702  {
1703  RelationClose(relation);
1704  relation = NULL;
1705  }
1706  break;
1707 
1709 
1710  /*
1711  * Speculative insertions are dealt with by delaying the
1712  * processing of the insert until the confirmation record
1713  * arrives. For that we simply unlink the record from the
1714  * chain, so it does not get freed/reused while restoring
1715  * spooled data from disk.
1716  *
1717  * This is safe in the face of concurrent catalog changes
1718  * because the relevant relation can't be changed between
1719  * speculative insertion and confirmation due to
1720  * CheckTableNotInUse() and locking.
1721  */
1722 
1723  /* clear out a pending (and thus failed) speculation */
1724  if (specinsert != NULL)
1725  {
1726  ReorderBufferReturnChange(rb, specinsert);
1727  specinsert = NULL;
1728  }
1729 
1730  /* and memorize the pending insertion */
1731  dlist_delete(&change->node);
1732  specinsert = change;
1733  break;
1734 
1736  {
1737  int i;
1738  int nrelids = change->data.truncate.nrelids;
1739  int nrelations = 0;
1740  Relation *relations;
1741 
1742  relations = palloc0(nrelids * sizeof(Relation));
1743  for (i = 0; i < nrelids; i++)
1744  {
1745  Oid relid = change->data.truncate.relids[i];
1746  Relation relation;
1747 
1748  relation = RelationIdGetRelation(relid);
1749 
1750  if (!RelationIsValid(relation))
1751  elog(ERROR, "could not open relation with OID %u", relid);
1752 
1753  if (!RelationIsLogicallyLogged(relation))
1754  continue;
1755 
1756  relations[nrelations++] = relation;
1757  }
1758 
1759  rb->apply_truncate(rb, txn, nrelations, relations, change);
1760 
1761  for (i = 0; i < nrelations; i++)
1762  RelationClose(relations[i]);
1763 
1764  break;
1765  }
1766 
1768  rb->message(rb, txn, change->lsn, true,
1769  change->data.msg.prefix,
1770  change->data.msg.message_size,
1771  change->data.msg.message);
1772  break;
1773 
1775  /* get rid of the old */
1776  TeardownHistoricSnapshot(false);
1777 
1778  if (snapshot_now->copied)
1779  {
1780  ReorderBufferFreeSnap(rb, snapshot_now);
1781  snapshot_now =
1782  ReorderBufferCopySnap(rb, change->data.snapshot,
1783  txn, command_id);
1784  }
1785 
1786  /*
1787  * Restored from disk, need to be careful not to double
1788  * free. We could introduce refcounting for that, but for
1789  * now this seems infrequent enough not to care.
1790  */
1791  else if (change->data.snapshot->copied)
1792  {
1793  snapshot_now =
1794  ReorderBufferCopySnap(rb, change->data.snapshot,
1795  txn, command_id);
1796  }
1797  else
1798  {
1799  snapshot_now = change->data.snapshot;
1800  }
1801 
1802 
1803  /* and continue with the new one */
1804  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1805  break;
1806 
1808  Assert(change->data.command_id != InvalidCommandId);
1809 
1810  if (command_id < change->data.command_id)
1811  {
1812  command_id = change->data.command_id;
1813 
1814  if (!snapshot_now->copied)
1815  {
1816  /* we don't use the global one anymore */
1817  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1818  txn, command_id);
1819  }
1820 
1821  snapshot_now->curcid = command_id;
1822 
1823  TeardownHistoricSnapshot(false);
1824  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1825 
1826  /*
1827  * Every time the CommandId is incremented, we could
1828  * see new catalog contents, so execute all
1829  * invalidations.
1830  */
1832  }
1833 
1834  break;
1835 
1837  elog(ERROR, "tuplecid value in changequeue");
1838  break;
1839  }
1840  }
1841 
1842  /*
1843  * There's a speculative insertion remaining, just clean in up, it
1844  * can't have been successful, otherwise we'd gotten a confirmation
1845  * record.
1846  */
1847  if (specinsert)
1848  {
1849  ReorderBufferReturnChange(rb, specinsert);
1850  specinsert = NULL;
1851  }
1852 
1853  /* clean up the iterator */
1854  ReorderBufferIterTXNFinish(rb, iterstate);
1855  iterstate = NULL;
1856 
1857  /* call commit callback */
1858  rb->commit(rb, txn, commit_lsn);
1859 
1860  /* this is just a sanity check against bad output plugin behaviour */
1862  elog(ERROR, "output plugin used XID %u",
1864 
1865  /* cleanup */
1866  TeardownHistoricSnapshot(false);
1867 
1868  /*
1869  * Aborting the current (sub-)transaction as a whole has the right
1870  * semantics. We want all locks acquired in here to be released, not
1871  * reassigned to the parent and we do not want any database access
1872  * have persistent effects.
1873  */
1875 
1876  /* make sure there's no cache pollution */
1878 
1879  if (using_subtxn)
1881 
1882  if (snapshot_now->copied)
1883  ReorderBufferFreeSnap(rb, snapshot_now);
1884 
1885  /* remove potential on-disk data, and deallocate */
1886  ReorderBufferCleanupTXN(rb, txn);
1887  }
1888  PG_CATCH();
1889  {
1890  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1891  if (iterstate)
1892  ReorderBufferIterTXNFinish(rb, iterstate);
1893 
1895 
1896  /*
1897  * Force cache invalidation to happen outside of a valid transaction
1898  * to prevent catalog access as we just caught an error.
1899  */
1901 
1902  /* make sure there's no cache pollution */
1904 
1905  if (using_subtxn)
1907 
1908  if (snapshot_now->copied)
1909  ReorderBufferFreeSnap(rb, snapshot_now);
1910 
1911  /* remove potential on-disk data, and deallocate */
1912  ReorderBufferCleanupTXN(rb, txn);
1913 
1914  PG_RE_THROW();
1915  }
1916  PG_END_TRY();
1917 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
uint32 CommandId
Definition: c.h:528
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
TimestampTz commit_time
void AbortCurrentTransaction(void)
Definition: xact.c:3162
bool IsToastRelation(Relation relation)
Definition: catalog.c:141
#define relpathperm(rnode, forknum)
Definition: relpath.h:83
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
ReorderBufferApplyChangeCB apply_change
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
bool copied
Definition: snapshot.h:185
struct ReorderBufferChange::@101::@102 tp
struct ReorderBufferChange::@101::@104 msg
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4653
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2051
ReorderBufferCommitCB commit
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:600
Form_pg_class rd_rel
Definition: rel.h:84
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
XLogRecPtr origin_lsn
#define FirstCommandId
Definition: c.h:530
#define ERROR
Definition: elog.h:43
#define RelationIsValid(relation)
Definition: rel.h:401
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:422
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4464
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:439
#define InvalidTransactionId
Definition: transam.h:31
XLogRecPtr final_lsn
void RelationClose(Relation relation)
Definition: relcache.c:2074
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
ReorderBufferMessageCB message
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
void * palloc0(Size size)
Definition: mcxt.c:980
#define InvalidCommandId
Definition: c.h:531
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
#define InvalidOid
Definition: postgres_ext.h:36
CommandId curcid
Definition: snapshot.h:187
static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define PG_CATCH()
Definition: elog.h:332
#define Assert(condition)
Definition: c.h:739
union ReorderBufferChange::@101 data
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
XLogRecPtr end_lsn
void StartTransactionCommand(void)
Definition: xact.c:2797
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4359
#define PG_RE_THROW()
Definition: elog.h:363
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
ReorderBufferApplyTruncateCB apply_truncate
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2035
#define elog(elevel,...)
Definition: elog.h:228
int i
ReorderBufferBeginCB begin
#define PG_TRY()
Definition: elog.h:322
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:1975
#define PG_END_TRY()
Definition: elog.h:347
struct ReorderBufferChange::@101::@103 truncate

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 950 of file reorderbuffer.c.

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

953 {
954  ReorderBufferTXN *subtxn;
955 
956  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
957  InvalidXLogRecPtr, false);
958 
959  /*
960  * No need to do anything if that subtxn didn't contain any changes
961  */
962  if (!subtxn)
963  return;
964 
965  subtxn->final_lsn = commit_lsn;
966  subtxn->end_lsn = end_lsn;
967 
968  /*
969  * Assign this subxact as a child of the toplevel xact (no-op if already
970  * done.)
971  */
973 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr final_lsn
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
XLogRecPtr end_lsn
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferCopySnap()

static Snapshot ReorderBufferCopySnap ( ReorderBuffer rb,
Snapshot  orig_snap,
ReorderBufferTXN txn,
CommandId  cid 
)
static

Definition at line 1429 of file reorderbuffer.c.

References SnapshotData::active_count, ReorderBuffer::context, SnapshotData::copied, dlist_iter::cur, SnapshotData::curcid, dlist_container, dlist_foreach, i, MemoryContextAllocZero(), ReorderBufferTXN::nsubtxns, qsort, SnapshotData::regd_count, ReorderBufferTXN::subtxns, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::xcnt, ReorderBufferTXN::xid, xidComparator(), and SnapshotData::xip.

Referenced by ReorderBufferCommit().

1431 {
1432  Snapshot snap;
1433  dlist_iter iter;
1434  int i = 0;
1435  Size size;
1436 
1437  size = sizeof(SnapshotData) +
1438  sizeof(TransactionId) * orig_snap->xcnt +
1439  sizeof(TransactionId) * (txn->nsubtxns + 1);
1440 
1441  snap = MemoryContextAllocZero(rb->context, size);
1442  memcpy(snap, orig_snap, sizeof(SnapshotData));
1443 
1444  snap->copied = true;
1445  snap->active_count = 1; /* mark as active so nobody frees it */
1446  snap->regd_count = 0;
1447  snap->xip = (TransactionId *) (snap + 1);
1448 
1449  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1450 
1451  /*
1452  * snap->subxip contains all txids that belong to our transaction which we
1453  * need to check via cmin/cmax. That's why we store the toplevel
1454  * transaction in there as well.
1455  */
1456  snap->subxip = snap->xip + snap->xcnt;
1457  snap->subxip[i++] = txn->xid;
1458 
1459  /*
1460  * subxcnt isn't decreased when subtransactions abort, so count manually.
1461  * Since it's an upper boundary it is safe to use it for the allocation
1462  * above.
1463  */
1464  snap->subxcnt = 1;
1465 
1466  dlist_foreach(iter, &txn->subtxns)
1467  {
1468  ReorderBufferTXN *sub_txn;
1469 
1470  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1471  snap->subxip[i++] = sub_txn->xid;
1472  snap->subxcnt++;
1473  }
1474 
1475  /* sort so we can bsearch() later */
1476  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1477 
1478  /* store the specified current CommandId */
1479  snap->curcid = cid;
1480 
1481  return snap;
1482 }
uint32 TransactionId
Definition: c.h:514
bool copied
Definition: snapshot.h:185
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
uint32 regd_count
Definition: snapshot.h:199
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
struct SnapshotData SnapshotData
TransactionId * xip
Definition: snapshot.h:168
MemoryContext context
TransactionId xid
dlist_node * cur
Definition: ilist.h:161
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:839
CommandId curcid
Definition: snapshot.h:187
size_t Size
Definition: c.h:467
dlist_head subtxns
uint32 xcnt
Definition: snapshot.h:169
int i
#define qsort(a, b, c, d)
Definition: port.h:491
TransactionId * subxip
Definition: snapshot.h:180
uint32 active_count
Definition: snapshot.h:198
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:138
int32 subxcnt
Definition: snapshot.h:181

◆ ReorderBufferExecuteInvalidations()

static void ReorderBufferExecuteInvalidations ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2260 of file reorderbuffer.c.

References i, ReorderBufferTXN::invalidations, LocalExecuteInvalidationMessage(), and ReorderBufferTXN::ninvalidations.

Referenced by ReorderBufferCommit().

2261 {
2262  int i;
2263 
2264  for (i = 0; i < txn->ninvalidations; i++)
2266 }
SharedInvalidationMessage * invalidations
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:556
int i

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2016 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

2017 {
2018  ReorderBufferTXN *txn;
2019 
2020  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2021  false);
2022 
2023  /* unknown, nothing to forget */
2024  if (txn == NULL)
2025  return;
2026 
2027  /* cosmetic... */
2028  txn->final_lsn = lsn;
2029 
2030  /*
2031  * Process cache invalidation messages if there are any. Even if we're not
2032  * interested in the transaction's contents, it could have manipulated the
2033  * catalog and we need to update the caches according to that.
2034  */
2035  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2037  txn->invalidations);
2038  else
2039  Assert(txn->ninvalidations == 0);
2040 
2041  /* remove potential on-disk data, and deallocate */
2042  ReorderBufferCleanupTXN(rb, txn);
2043 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:739
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer rb)

Definition at line 343 of file reorderbuffer.c.

References ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

344 {
345  MemoryContext context = rb->context;
346 
347  /*
348  * We free separately allocated data by entirely scrapping reorderbuffer's
349  * memory context.
350  */
351  MemoryContextDelete(context);
352 
353  /* Free disk space used by unconsumed reorder buffers */
355 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
ReplicationSlotPersistentData data
Definition: slot.h:132
MemoryContext context
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NameStr(name)
Definition: c.h:616
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)

◆ ReorderBufferFreeSnap()

static void ReorderBufferFreeSnap ( ReorderBuffer rb,
Snapshot  snap 
)
static

Definition at line 1488 of file reorderbuffer.c.

References SnapshotData::copied, pfree(), and SnapBuildSnapDecRefcount().

Referenced by ReorderBufferCommit(), and ReorderBufferReturnChange().

1489 {
1490  if (snap->copied)
1491  pfree(snap);
1492  else
1494 }
bool copied
Definition: snapshot.h:185
void pfree(void *pointer)
Definition: mcxt.c:1056
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:428

◆ ReorderBufferGetChange()

ReorderBufferChange* ReorderBufferGetChange ( ReorderBuffer rb)

Definition at line 411 of file reorderbuffer.c.

References ReorderBuffer::change_context, and MemoryContextAlloc().

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddNewTupleCids(), ReorderBufferAddSnapshot(), ReorderBufferQueueMessage(), and ReorderBufferRestoreChange().

412 {
413  ReorderBufferChange *change;
414 
415  change = (ReorderBufferChange *)
417 
418  memset(change, 0, sizeof(ReorderBufferChange));
419  return change;
420 }
MemoryContext change_context
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer rb)

Definition at line 775 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessRunningXacts().

776 {
777  ReorderBufferTXN *txn;
778 
779  AssertTXNLsnOrder(rb);
780 
782  return NULL;
783 
785 
788  return txn;
789 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
dlist_head toplevel_by_lsn
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:739
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define rbtxn_is_known_subxact(txn)

◆ ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBuffer rb)

Definition at line 803 of file reorderbuffer.c.

References AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, dlist_head_element, dlist_is_empty(), InvalidTransactionId, ReorderBufferTXNByIdEnt::txn, ReorderBuffer::txns_by_base_snapshot_lsn, and SnapshotData::xmin.

Referenced by SnapBuildProcessRunningXacts().

804 {
805  ReorderBufferTXN *txn;
806 
807  AssertTXNLsnOrder(rb);
808 
810  return InvalidTransactionId;
811 
812  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
814  return txn->base_snapshot->xmin;
815 }
Snapshot base_snapshot
dlist_head txns_by_base_snapshot_lsn
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: snapshot.h:157
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289

◆ ReorderBufferGetRelids()

Oid* ReorderBufferGetRelids ( ReorderBuffer rb,
int  nrelids 
)

Definition at line 523 of file reorderbuffer.c.

References ReorderBuffer::context, and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

524 {
525  Oid *relids;
526  Size alloc_len;
527 
528  alloc_len = sizeof(Oid) * nrelids;
529 
530  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
531 
532  return relids;
533 }
unsigned int Oid
Definition: postgres_ext.h:31
MemoryContext context
size_t Size
Definition: c.h:467
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796

◆ ReorderBufferGetTupleBuf()

ReorderBufferTupleBuf* ReorderBufferGetTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)

Definition at line 487 of file reorderbuffer.c.

References ReorderBufferTupleBuf::alloc_tuple_size, MemoryContextAlloc(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, HeapTupleData::t_data, ReorderBuffer::tup_context, and ReorderBufferTupleBuf::tuple.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

488 {
489  ReorderBufferTupleBuf *tuple;
490  Size alloc_len;
491 
492  alloc_len = tuple_len + SizeofHeapTupleHeader;
493 
494  tuple = (ReorderBufferTupleBuf *)
496  sizeof(ReorderBufferTupleBuf) +
497  MAXIMUM_ALIGNOF + alloc_len);
498  tuple->alloc_tuple_size = alloc_len;
499  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
500 
501  return tuple;
502 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
HeapTupleHeader t_data
Definition: htup.h:68
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:38
HeapTupleData tuple
Definition: reorderbuffer.h:29
size_t Size
Definition: c.h:467
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
MemoryContext tup_context

◆ ReorderBufferGetTXN()

static ReorderBufferTXN * ReorderBufferGetTXN ( ReorderBuffer rb)
static

Definition at line 361 of file reorderbuffer.c.

References ReorderBufferTXN::changes, dlist_init(), MemoryContextAlloc(), ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferTXNByIdEnt::txn, and ReorderBuffer::txn_context.

Referenced by ReorderBufferTXNByXid().

362 {
363  ReorderBufferTXN *txn;
364 
365  txn = (ReorderBufferTXN *)
367 
368  memset(txn, 0, sizeof(ReorderBufferTXN));
369 
370  dlist_init(&txn->changes);
371  dlist_init(&txn->tuplecids);
372  dlist_init(&txn->subtxns);
373 
374  return txn;
375 }
dlist_head changes
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
dlist_head subtxns
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
MemoryContext txn_context
dlist_head tuplecids

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 2052 of file reorderbuffer.c.

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by DecodeStandbyOp(), and ReorderBufferForget().

2054 {
2055  bool use_subtxn = IsTransactionOrTransactionBlock();
2056  int i;
2057 
2058  if (use_subtxn)
2059  BeginInternalSubTransaction("replay");
2060 
2061  /*
2062  * Force invalidations to happen outside of a valid transaction - that way
2063  * entries will just be marked as invalid without accessing the catalog.
2064  * That's advantageous because we don't need to setup the full state
2065  * necessary for catalog access.
2066  */
2067  if (use_subtxn)
2069 
2070  for (i = 0; i < ninvalidations; i++)
2071  LocalExecuteInvalidationMessage(&invalidations[i]);
2072 
2073  if (use_subtxn)
2075 }
void AbortCurrentTransaction(void)
Definition: xact.c:3162
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4653
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4464
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4359
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:556
int i

◆ ReorderBufferIterCompare()

static int ReorderBufferIterCompare ( Datum  a,
Datum  b,
void *  arg 
)
static

Definition at line 992 of file reorderbuffer.c.

References DatumGetInt32, ReorderBufferIterTXNState::entries, and ReorderBufferIterTXNEntry::lsn.

Referenced by ReorderBufferIterTXNInit().

993 {
995  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
996  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
997 
998  if (pos_a < pos_b)
999  return 1;
1000  else if (pos_a == pos_b)
1001  return 0;
1002  return -1;
1003 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
#define DatumGetInt32(X)
Definition: postgres.h:472
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:298
void * arg

◆ ReorderBufferIterTXNFinish()

static void ReorderBufferIterTXNFinish ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1228 of file reorderbuffer.c.

References Assert, binaryheap_free(), dlist_container, dlist_is_empty(), dlist_pop_head_node(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::file, FileClose(), ReorderBufferIterTXNState::heap, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, pfree(), ReorderBufferReturnChange(), and TXNEntryFile::vfd.

Referenced by ReorderBufferCommit().

1230 {
1231  int32 off;
1232 
1233  for (off = 0; off < state->nr_txns; off++)
1234  {
1235  if (state->entries[off].file.vfd != -1)
1236  FileClose(state->entries[off].file.vfd);
1237  }
1238 
1239  /* free memory we might have "leaked" in the last *Next call */
1240  if (!dlist_is_empty(&state->old_change))
1241  {
1242  ReorderBufferChange *change;
1243 
1244  change = dlist_container(ReorderBufferChange, node,
1245  dlist_pop_head_node(&state->old_change));
1246  ReorderBufferReturnChange(rb, change);
1247  Assert(dlist_is_empty(&state->old_change));
1248  }
1249 
1250  binaryheap_free(state->heap);
1251  pfree(state);
1252 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
signed int int32
Definition: c.h:347
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:1056
void FileClose(File file)
Definition: fd.c:1748
#define Assert(condition)
Definition: c.h:739
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:69
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368

◆ ReorderBufferIterTXNInit()

static void ReorderBufferIterTXNInit ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferIterTXNState *volatile *  iter_state 
)
static

Definition at line 1015 of file reorderbuffer.c.

References binaryheap_add_unordered(), binaryheap_allocate(), binaryheap_build(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, ReorderBuffer::context, dlist_iter::cur, dlist_container, dlist_foreach, dlist_head_element, dlist_init(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::file, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, MemoryContextAllocZero(), ReorderBufferTXN::nentries, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, rbtxn_is_serialized, ReorderBufferIterCompare(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferIterTXNEntry::segno, ReorderBufferTXN::subtxns, ReorderBufferTXNByIdEnt::txn, ReorderBufferIterTXNEntry::txn, and TXNEntryFile::vfd.

Referenced by ReorderBufferCommit().

1017 {
1018  Size nr_txns = 0;
1020  dlist_iter cur_txn_i;
1021  int32 off;
1022 
1023  *iter_state = NULL;
1024 
1025  /*
1026  * Calculate the size of our heap: one element for every transaction that
1027  * contains changes. (Besides the transactions already in the reorder
1028  * buffer, we count the one we were directly passed.)
1029  */
1030  if (txn->nentries > 0)
1031  nr_txns++;
1032 
1033  dlist_foreach(cur_txn_i, &txn->subtxns)
1034  {
1035  ReorderBufferTXN *cur_txn;
1036 
1037  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1038 
1039  if (cur_txn->nentries > 0)
1040  nr_txns++;
1041  }
1042 
1043  /*
1044  * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no
1045  * need to allocate/build a heap then.
1046  */
1047 
1048  /* allocate iteration state */
1049  state = (ReorderBufferIterTXNState *)
1051  sizeof(ReorderBufferIterTXNState) +
1052  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1053 
1054  state->nr_txns = nr_txns;
1055  dlist_init(&state->old_change);
1056 
1057  for (off = 0; off < state->nr_txns; off++)
1058  {
1059  state->entries[off].file.vfd = -1;
1060  state->entries[off].segno = 0;
1061  }
1062 
1063  /* allocate heap */
1064  state->heap = binaryheap_allocate(state->nr_txns,
1066  state);
1067 
1068  /* Now that the state fields are initialized, it is safe to return it. */
1069  *iter_state = state;
1070 
1071  /*
1072  * Now insert items into the binary heap, in an unordered fashion. (We
1073  * will run a heap assembly step at the end; this is more efficient.)
1074  */
1075 
1076  off = 0;
1077 
1078  /* add toplevel transaction if it contains changes */
1079  if (txn->nentries > 0)
1080  {
1081  ReorderBufferChange *cur_change;
1082 
1083  if (rbtxn_is_serialized(txn))
1084  {
1085  /* serialize remaining changes */
1086  ReorderBufferSerializeTXN(rb, txn);
1087  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1088  &state->entries[off].segno);
1089  }
1090 
1091  cur_change = dlist_head_element(ReorderBufferChange, node,
1092  &txn->changes);
1093 
1094  state->entries[off].lsn = cur_change->lsn;
1095  state->entries[off].change = cur_change;
1096  state->entries[off].txn = txn;
1097 
1099  }
1100 
1101  /* add subtransactions if they contain changes */
1102  dlist_foreach(cur_txn_i, &txn->subtxns)
1103  {
1104  ReorderBufferTXN *cur_txn;
1105 
1106  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1107 
1108  if (cur_txn->nentries > 0)
1109  {
1110  ReorderBufferChange *cur_change;
1111 
1112  if (rbtxn_is_serialized(cur_txn))
1113  {
1114  /* serialize remaining changes */
1115  ReorderBufferSerializeTXN(rb, cur_txn);
1116  ReorderBufferRestoreChanges(rb, cur_txn,
1117  &state->entries[off].file,
1118  &state->entries[off].segno);
1119  }
1120  cur_change = dlist_head_element(ReorderBufferChange, node,
1121  &cur_txn->changes);
1122 
1123  state->entries[off].lsn = cur_change->lsn;
1124  state->entries[off].change = cur_change;
1125  state->entries[off].txn = cur_txn;
1126 
1128  }
1129  }
1130 
1131  /* assemble a valid binary heap */
1132  binaryheap_build(state->heap);
1133 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
ReorderBufferTXN * txn
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
void binaryheap_add_unordered(binaryheap *heap, Datum d)
Definition: binaryheap.c:110
signed int int32
Definition: c.h:347
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
ReorderBufferChange * change
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
MemoryContext context
dlist_node * cur
Definition: ilist.h:161
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:839
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:126
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
Definition: regguts.h:298
size_t Size
Definition: c.h:467
dlist_head subtxns
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:33
#define Int32GetDatum(X)
Definition: postgres.h:479
#define rbtxn_is_serialized(txn)

◆ ReorderBufferIterTXNNext()

static ReorderBufferChange * ReorderBufferIterTXNNext ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1142 of file reorderbuffer.c.

References Assert, binaryheap::bh_size, binaryheap_first(), binaryheap_remove_first(), binaryheap_replace_first(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, DatumGetInt32, DEBUG2, dlist_container, dlist_delete(), dlist_has_next(), dlist_head_element, dlist_is_empty(), dlist_next_node(), dlist_pop_head_node(), dlist_push_tail(), elog, ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::file, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, next, ReorderBufferChange::node, ReorderBufferIterTXNState::old_change, ReorderBufferRestoreChanges(), ReorderBufferReturnChange(), ReorderBufferIterTXNEntry::segno, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferCommit().

1143 {
1144  ReorderBufferChange *change;
1146  int32 off;
1147 
1148  /* nothing there anymore */
1149  if (state->heap->bh_size == 0)
1150  return NULL;
1151 
1152  off = DatumGetInt32(binaryheap_first(state->heap));
1153  entry = &state->entries[off];
1154 
1155  /* free memory we might have "leaked" in the previous *Next call */
1156  if (!dlist_is_empty(&state->old_change))
1157  {
1158  change = dlist_container(ReorderBufferChange, node,
1159  dlist_pop_head_node(&state->old_change));
1160  ReorderBufferReturnChange(rb, change);
1161  Assert(dlist_is_empty(&state->old_change));
1162  }
1163 
1164  change = entry->change;
1165 
1166  /*
1167  * update heap with information about which transaction has the next
1168  * relevant change in LSN order
1169  */
1170 
1171  /* there are in-memory changes */
1172  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1173  {
1174  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1175  ReorderBufferChange *next_change =
1176  dlist_container(ReorderBufferChange, node, next);
1177 
1178  /* txn stays the same */
1179  state->entries[off].lsn = next_change->lsn;
1180  state->entries[off].change = next_change;
1181 
1183  return change;
1184  }
1185 
1186  /* try to load changes from disk */
1187  if (entry->txn->nentries != entry->txn->nentries_mem)
1188  {
1189  /*
1190  * Ugly: restoring changes will reuse *Change records, thus delete the
1191  * current one from the per-tx list and only free in the next call.
1192  */
1193  dlist_delete(&change->node);
1194  dlist_push_tail(&state->old_change, &change->node);
1195 
1196  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1197  &state->entries[off].segno))
1198  {
1199  /* successfully restored changes from disk */
1200  ReorderBufferChange *next_change =
1202  &entry->txn->changes);
1203 
1204  elog(DEBUG2, "restored %u/%u changes from disk",
1205  (uint32) entry->txn->nentries_mem,
1206  (uint32) entry->txn->nentries);
1207 
1208  Assert(entry->txn->nentries_mem);
1209  /* txn stays the same */
1210  state->entries[off].lsn = next_change->lsn;
1211  state->entries[off].change = next_change;
1213 
1214  return change;
1215  }
1216  }
1217 
1218  /* ok, no changes there anymore, remove */
1219  binaryheap_remove_first(state->heap);
1220 
1221  return change;
1222 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
static int32 next
Definition: blutils.c:217
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
#define DatumGetInt32(X)
Definition: postgres.h:472
ReorderBufferTXN * txn
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void binaryheap_replace_first(binaryheap *heap, Datum d)
Definition: binaryheap.c:204
signed int int32
Definition: c.h:347
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:421
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
Datum binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:159
#define DEBUG2
Definition: elog.h:24
unsigned int uint32
Definition: c.h:359
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
int bh_size
Definition: binaryheap.h:32
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
ReorderBufferChange * change
static bool dlist_has_next(dlist_head *head, dlist_node *node)
Definition: ilist.h:402
#define Assert(condition)
Definition: c.h:739
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define Int32GetDatum(X)
Definition: postgres.h:479
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
#define elog(elevel,...)
Definition: elog.h:228
Datum binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:174

◆ ReorderBufferLargestTXN()

static ReorderBufferTXN* ReorderBufferLargestTXN ( ReorderBuffer rb)
static

Definition at line 2360 of file reorderbuffer.c.

References Assert, ReorderBuffer::by_txn, hash_seq_init(), hash_seq_search(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferCheckMemoryLimit().

2361 {
2362  HASH_SEQ_STATUS hash_seq;
2364  ReorderBufferTXN *largest = NULL;
2365 
2366  hash_seq_init(&hash_seq, rb->by_txn);
2367  while ((ent = hash_seq_search(&hash_seq)) != NULL)
2368  {
2369  ReorderBufferTXN *txn = ent->txn;
2370 
2371  /* if the current transaction is larger, remember it */
2372  if ((!largest) || (txn->size > largest->size))
2373  largest = txn;
2374  }
2375 
2376  Assert(largest);
2377  Assert(largest->size > 0);
2378  Assert(largest->size <= rb->size);
2379 
2380  return largest;
2381 }
#define Assert(condition)
Definition: c.h:739
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1389
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1379
ReorderBufferTXN * txn

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2088 of file reorderbuffer.c.

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by DecodeHeap2Op(), DecodeHeapOp(), DecodeLogicalMsgOp(), DecodeStandbyOp(), DecodeXactOp(), DecodeXLogOp(), and LogicalDecodingProcessRecord().

2089 {
2090  /* many records won't have an xid assigned, centralize check here */
2091  if (xid != InvalidTransactionId)
2092  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2093 }
#define InvalidTransactionId
Definition: transam.h:31
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change 
)

Definition at line 634 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBufferChangeMemoryUpdate(), ReorderBufferCheckMemoryLimit(), ReorderBufferTXNByXid(), ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

636 {
637  ReorderBufferTXN *txn;
638 
639  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
640 
641  change->lsn = lsn;
642  change->txn = txn;
643 
644  Assert(InvalidXLogRecPtr != lsn);
645  dlist_push_tail(&txn->changes, &change->node);
646  txn->nentries++;
647  txn->nentries_mem++;
648 
649  /* update memory accounting information */
650  ReorderBufferChangeMemoryUpdate(rb, change, true);
651 
652  /* check the memory limits and evict something if needed */
654 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:86
dlist_head changes
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:739
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snapshot,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 660 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), TeardownHistoricSnapshot(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeLogicalMsgOp().

664 {
665  if (transactional)
666  {
667  MemoryContext oldcontext;
668  ReorderBufferChange *change;
669 
671 
672  oldcontext = MemoryContextSwitchTo(rb->context);
673 
674  change = ReorderBufferGetChange(rb);
676  change->data.msg.prefix = pstrdup(prefix);
677  change->data.msg.message_size = message_size;
678  change->data.msg.message = palloc(message_size);
679  memcpy(change->data.msg.message, message, message_size);
680 
681  ReorderBufferQueueChange(rb, xid, lsn, change);
682 
683  MemoryContextSwitchTo(oldcontext);
684  }
685  else
686  {
687  ReorderBufferTXN *txn = NULL;
688  volatile Snapshot snapshot_now = snapshot;
689 
690  if (xid != InvalidTransactionId)
691  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
692 
693  /* setup snapshot to allow catalog access */
694  SetupHistoricSnapshot(snapshot_now, NULL);
695  PG_TRY();
696  {
697  rb->message(rb, txn, lsn, false, prefix, message_size, message);
698 
700  }
701  PG_CATCH();
702  {
704  PG_RE_THROW();
705  }
706  PG_END_TRY();
707  }
708 }
struct ReorderBufferChange::@101::@104 msg
char * pstrdup(const char *in)
Definition: mcxt.c:1186
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2051
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
#define InvalidTransactionId
Definition: transam.h:31
ReorderBufferMessageCB message
MemoryContext context
#define PG_CATCH()
Definition: elog.h:332
#define Assert(condition)
Definition: c.h:739
union ReorderBufferChange::@101 data
#define PG_RE_THROW()
Definition: elog.h:363
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:949
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2035
#define PG_TRY()
Definition: elog.h:322
#define PG_END_TRY()
Definition: elog.h:347

◆ ReorderBufferRestoreChange()

static void ReorderBufferRestoreChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
char *  change 
)
static

Definition at line 2926 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, ReorderBufferTXN::changes, ReorderBuffer::context, ReorderBufferChange::data, dlist_push_tail(), MemoryContextAlloc(), MemoryContextAllocZero(), ReorderBufferChange::msg, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, offsetof, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferGetChange(), ReorderBufferGetRelids(), ReorderBufferGetTupleBuf(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, ReorderBufferChange::snapshot, SnapshotData::subxcnt, ReorderBufferChange::tp, ReorderBufferChange::truncate, SnapshotData::xcnt, and SnapshotData::xip.

Referenced by ReorderBufferRestoreChanges().

2928 {
2929  ReorderBufferDiskChange *ondisk;
2930  ReorderBufferChange *change;
2931 
2932  ondisk = (ReorderBufferDiskChange *) data;
2933 
2934  change = ReorderBufferGetChange(rb);
2935 
2936  /* copy static part */
2937  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
2938 
2939  data += sizeof(ReorderBufferDiskChange);
2940 
2941  /* restore individual stuff */
2942  switch (change->action)
2943  {
2944  /* fall through these, they're all similar enough */
2949  if (change->data.tp.oldtuple)
2950  {
2951  uint32 tuplelen = ((HeapTuple) data)->t_len;
2952 
2953  change->data.tp.oldtuple =
2955 
2956  /* restore ->tuple */
2957  memcpy(&change->data.tp.oldtuple->tuple, data,
2958  sizeof(HeapTupleData));
2959  data += sizeof(HeapTupleData);
2960 
2961  /* reset t_data pointer into the new tuplebuf */
2962  change->data.tp.oldtuple->tuple.t_data =
2963  ReorderBufferTupleBufData(change->data.tp.oldtuple);
2964 
2965  /* restore tuple data itself */
2966  memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
2967  data += tuplelen;
2968  }
2969 
2970  if (change->data.tp.newtuple)
2971  {
2972  /* here, data might not be suitably aligned! */
2973  uint32 tuplelen;
2974 
2975  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
2976  sizeof(uint32));
2977 
2978  change->data.tp.newtuple =
2980 
2981  /* restore ->tuple */
2982  memcpy(&change->data.tp.newtuple->tuple, data,
2983  sizeof(HeapTupleData));
2984  data += sizeof(HeapTupleData);
2985 
2986  /* reset t_data pointer into the new tuplebuf */
2987  change->data.tp.newtuple->tuple.t_data =
2988  ReorderBufferTupleBufData(change->data.tp.newtuple);
2989 
2990  /* restore tuple data itself */
2991  memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
2992  data += tuplelen;
2993  }
2994 
2995  break;
2997  {
2998  Size prefix_size;
2999 
3000  /* read prefix */
3001  memcpy(&prefix_size, data, sizeof(Size));
3002  data += sizeof(Size);
3003  change->data.msg.prefix = MemoryContextAlloc(rb->context,
3004  prefix_size);
3005  memcpy(change->data.msg.prefix, data, prefix_size);
3006  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
3007  data += prefix_size;
3008 
3009  /* read the message */
3010  memcpy(&change->data.msg.message_size, data, sizeof(Size));
3011  data += sizeof(Size);
3012  change->data.msg.message = MemoryContextAlloc(rb->context,
3013  change->data.msg.message_size);
3014  memcpy(change->data.msg.message, data,
3015  change->data.msg.message_size);
3016  data += change->data.msg.message_size;
3017 
3018  break;
3019  }
3021  {
3022  Snapshot oldsnap;
3023  Snapshot newsnap;
3024  Size size;
3025 
3026  oldsnap = (Snapshot) data;
3027 
3028  size = sizeof(SnapshotData) +
3029  sizeof(TransactionId) * oldsnap->xcnt +
3030  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
3031 
3032  change->data.snapshot = MemoryContextAllocZero(rb->context, size);
3033 
3034  newsnap = change->data.snapshot;
3035 
3036  memcpy(newsnap, data, size);
3037  newsnap->xip = (TransactionId *)
3038  (((char *) newsnap) + sizeof(SnapshotData));
3039  newsnap->subxip = newsnap->xip + newsnap->xcnt;
3040  newsnap->copied = true;
3041  break;
3042  }
3043  /* the base struct contains all the data, easy peasy */
3045  {
3046  Oid *relids;
3047 
3048  relids = ReorderBufferGetRelids(rb,
3049  change->data.truncate.nrelids);
3050  memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
3051  change->data.truncate.relids = relids;
3052 
3053  break;
3054  }
3058  break;
3059  }
3060 
3061  dlist_push_tail(&txn->changes, &change->node);
3062  txn->nentries_mem++;
3063 
3064  /*
3065  * Update memory accounting for the restored change. We need to do this
3066  * although we don't check the memory limit when restoring the changes in
3067  * this branch (we only do that when initially queueing the changes after
3068  * decoding), because we will release the changes later, and that will
3069  * update the accounting too (subtracting the size from the counters). And
3070  * we don't want to underflow there.
3071  */
3072  ReorderBufferChangeMemoryUpdate(rb, change, true);
3073 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
HeapTupleData * HeapTuple
Definition: htup.h:71
uint32 TransactionId
Definition: c.h:514
struct ReorderBufferChange::@101::@102 tp
struct ReorderBufferChange::@101::@104 msg
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
struct SnapshotData * Snapshot
Definition: snapshot.h:121
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:38
dlist_head changes
struct SnapshotData SnapshotData
unsigned int uint32
Definition: c.h:359
TransactionId * xip
Definition: snapshot.h:168
MemoryContext context
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:839
struct ReorderBufferDiskChange ReorderBufferDiskChange
#define Assert(condition)
Definition: c.h:739
union ReorderBufferChange::@101 data
size_t Size
Definition: c.h:467
uint32 xcnt
Definition: snapshot.h:169
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
Oid * ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
ReorderBufferChange change
struct HeapTupleData HeapTupleData
#define offsetof(type, field)
Definition: c.h:662
struct ReorderBufferChange::@101::@103 truncate
int32 subxcnt
Definition: snapshot.h:181
ReorderBufferTupleBuf * ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)

◆ ReorderBufferRestoreChanges()

static Size ReorderBufferRestoreChanges ( ReorderBuffer rb,
ReorderBufferTXN txn,
TXNEntryFile file,
XLogSegNo segno 
)
static

Definition at line 2785 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, cleanup(), dlist_mutable_iter::cur, TXNEntryFile::curOffset, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FileClose(), FileRead(), ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, max_changes_in_memory, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBuffer::outbuf, PathNameOpenFile(), PG_BINARY, ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferSerializedPath(), ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, TXNEntryFile::vfd, WAIT_EVENT_REORDER_BUFFER_READ, wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferIterTXNInit(), and ReorderBufferIterTXNNext().

2787 {
2788  Size restored = 0;
2789  XLogSegNo last_segno;
2790  dlist_mutable_iter cleanup_iter;
2791  File *fd = &file->vfd;
2792 
2795 
2796  /* free current entries, so we have memory for more */
2797  dlist_foreach_modify(cleanup_iter, &txn->changes)
2798  {
2800  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
2801 
2802  dlist_delete(&cleanup->node);
2803  ReorderBufferReturnChange(rb, cleanup);
2804  }
2805  txn->nentries_mem = 0;
2806  Assert(dlist_is_empty(&txn->changes));
2807 
2808  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
2809 
2810  while (restored < max_changes_in_memory && *segno <= last_segno)
2811  {
2812  int readBytes;
2813  ReorderBufferDiskChange *ondisk;
2814 
2815  if (*fd == -1)
2816  {
2817  char path[MAXPGPATH];
2818 
2819  /* first time in */
2820  if (*segno == 0)
2821  XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
2822 
2823  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2824 
2825  /*
2826  * No need to care about TLIs here, only used during a single run,
2827  * so each LSN only maps to a specific WAL record.
2828  */
2830  *segno);
2831 
2832  *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
2833 
2834  /* No harm in resetting the offset even in case of failure */
2835  file->curOffset = 0;
2836 
2837  if (*fd < 0 && errno == ENOENT)
2838  {
2839  *fd = -1;
2840  (*segno)++;
2841  continue;
2842  }
2843  else if (*fd < 0)
2844  ereport(ERROR,
2846  errmsg("could not open file \"%s\": %m",
2847  path)));
2848  }
2849 
2850  /*
2851  * Read the statically sized part of a change which has information
2852  * about the total size. If we couldn't read a record, we're at the
2853  * end of this file.
2854  */
2856  readBytes = FileRead(file->vfd, rb->outbuf,
2857  sizeof(ReorderBufferDiskChange),
2859 
2860  /* eof */
2861  if (readBytes == 0)
2862  {
2863  FileClose(*fd);
2864  *fd = -1;
2865  (*segno)++;
2866  continue;
2867  }
2868  else if (readBytes < 0)
2869  ereport(ERROR,
2871  errmsg("could not read from reorderbuffer spill file: %m")));
2872  else if (readBytes != sizeof(ReorderBufferDiskChange))
2873  ereport(ERROR,
2875  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2876  readBytes,
2877  (uint32) sizeof(ReorderBufferDiskChange))));
2878 
2879  file->curOffset += readBytes;
2880 
2881  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2882 
2884  sizeof(ReorderBufferDiskChange) + ondisk->size);
2885  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2886 
2887  readBytes = FileRead(file->vfd,
2888  rb->outbuf + sizeof(ReorderBufferDiskChange),
2889  ondisk->size - sizeof(ReorderBufferDiskChange),
2890  file->curOffset,
2892 
2893  if (readBytes < 0)
2894  ereport(ERROR,
2896  errmsg("could not read from reorderbuffer spill file: %m")));
2897  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
2898  ereport(ERROR,
2900  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2901  readBytes,
2902  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
2903 
2904  file->curOffset += readBytes;
2905 
2906  /*
2907  * ok, read a full change from disk, now restore it into proper
2908  * in-memory format
2909  */
2910  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
2911  restored++;
2912  }
2913 
2914  return restored;
2915 }
XLogRecPtr first_lsn
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
dlist_node * cur
Definition: ilist.h:180
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1358
int wal_segment_size
Definition: xlog.c:112
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1221
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define ERROR
Definition: elog.h:43
dlist_head changes
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:41
int errcode_for_file_access(void)
Definition: elog.c:631
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
unsigned int uint32
Definition: c.h:359
XLogRecPtr final_lsn
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define ereport(elevel, rest)
Definition: elog.h:141
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
static void cleanup(void)
Definition: bootstrap.c:901
TransactionId xid
struct ReorderBufferDiskChange ReorderBufferDiskChange
void FileClose(File file)
Definition: fd.c:1748
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:739
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
size_t Size
Definition: c.h:467
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
int errmsg(const char *fmt,...)
Definition: elog.c:822
static const Size max_changes_in_memory
int FileRead(File file, char *buffer, int amount, off_t offset, uint32 wait_event_info)
Definition: fd.c:1895
int File
Definition: fd.h:45
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ ReorderBufferRestoreCleanup()

static void ReorderBufferRestoreCleanup ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 3079 of file reorderbuffer.c.

References Assert, cur, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, MAXPGPATH, MyReplicationSlot, ReorderBufferSerializedPath(), wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferCleanupTXN().

3080 {
3081  XLogSegNo first;
3082  XLogSegNo cur;
3083  XLogSegNo last;
3084 
3087 
3088  XLByteToSeg(txn->first_lsn, first, wal_segment_size);
3089  XLByteToSeg(txn->final_lsn, last, wal_segment_size);
3090 
3091  /* iterate over all possible filenames, and delete them */
3092  for (cur = first; cur <= last; cur++)
3093  {
3094  char path[MAXPGPATH];
3095 
3097  if (unlink(path) != 0 && errno != ENOENT)
3098  ereport(ERROR,
3100  errmsg("could not remove file \"%s\": %m", path)));
3101  }
3102 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int wal_segment_size
Definition: xlog.c:112
struct cursor * cur
Definition: ecpg.c:28
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:41
int errcode_for_file_access(void)
Definition: elog.c:631
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
XLogRecPtr final_lsn
#define ereport(elevel, rest)
Definition: elog.h:141
TransactionId xid
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:739
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ ReorderBufferReturnChange()

void ReorderBufferReturnChange ( ReorderBuffer rb,
ReorderBufferChange change 
)

Definition at line 426 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::msg, pfree(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferFreeSnap(), ReorderBufferReturnRelids(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, ReorderBufferChange::tp, and ReorderBufferChange::truncate.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferCommit(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), and ReorderBufferToastReset().

427 {
428  /* update memory accounting info */
429  ReorderBufferChangeMemoryUpdate(rb, change, false);
430 
431  /* free contained data */
432  switch (change->action)
433  {
438  if (change->data.tp.newtuple)
439  {
440  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
441  change->data.tp.newtuple = NULL;
442  }
443 
444  if (change->data.tp.oldtuple)
445  {
446  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
447  change->data.tp.oldtuple = NULL;
448  }
449  break;
451  if (change->data.msg.prefix != NULL)
452  pfree(change->data.msg.prefix);
453  change->data.msg.prefix = NULL;
454  if (change->data.msg.message != NULL)
455  pfree(change->data.msg.message);
456  change->data.msg.message = NULL;
457  break;
459  if (change->data.snapshot)
460  {
461  ReorderBufferFreeSnap(rb, change->data.snapshot);
462  change->data.snapshot = NULL;
463  }
464  break;
465  /* no data in addition to the struct itself */
467  if (change->data.truncate.relids != NULL)
468  {
469  ReorderBufferReturnRelids(rb, change->data.truncate.relids);
470  change->data.truncate.relids = NULL;
471  }
472  break;
476  break;
477  }
478 
479  pfree(change);
480 }
struct ReorderBufferChange::@101::@102 tp
struct ReorderBufferChange::@101::@104 msg
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
void pfree(void *pointer)
Definition: mcxt.c:1056
void ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
union ReorderBufferChange::@101 data
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
struct ReorderBufferChange::@101::@103 truncate

◆ ReorderBufferReturnRelids()

void ReorderBufferReturnRelids ( ReorderBuffer rb,
Oid relids 
)

Definition at line 539 of file reorderbuffer.c.

References pfree().

Referenced by ReorderBufferReturnChange().

540 {
541  pfree(relids);
542 }
void pfree(void *pointer)
Definition: mcxt.c:1056

◆ ReorderBufferReturnTupleBuf()

void ReorderBufferReturnTupleBuf ( ReorderBuffer rb,
ReorderBufferTupleBuf tuple 
)

Definition at line 508 of file reorderbuffer.c.

References pfree().

Referenced by ReorderBufferReturnChange().

509 {
510  pfree(tuple);
511 }
void pfree(void *pointer)
Definition: mcxt.c:1056

◆ ReorderBufferReturnTXN()

static void ReorderBufferReturnTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 381 of file reorderbuffer.c.

References ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, hash_destroy(), ReorderBufferTXN::invalidations, InvalidTransactionId, pfree(), ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::xid.

Referenced by ReorderBufferCleanupTXN().

382 {
383  /* clean the lookup cache if we were cached (quite likely) */
384  if (rb->by_txn_last_xid == txn->xid)
385  {
387  rb->by_txn_last_txn = NULL;
388  }
389 
390  /* free data that's contained */
391 
392  if (txn->tuplecid_hash != NULL)
393  {
395  txn->tuplecid_hash = NULL;
396  }
397 
398  if (txn->invalidations)
399  {
400  pfree(txn->invalidations);
401  txn->invalidations = NULL;
402  }
403 
404  pfree(txn);
405 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:814
TransactionId by_txn_last_xid
void pfree(void *pointer)
Definition: mcxt.c:1056
#define InvalidTransactionId
Definition: transam.h:31
ReorderBufferTXN * by_txn_last_txn
TransactionId xid
SharedInvalidationMessage * invalidations

◆ ReorderBufferSerializeChange()

static void ReorderBufferSerializeChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  fd,
ReorderBufferChange change 
)
static

Definition at line 2519 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, CloseTransientFile(), ReorderBufferChange::data, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferChange::msg, ReorderBuffer::outbuf, pgstat_report_wait_end(), pgstat_report_wait_start(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTupleBuf::tuple, WAIT_EVENT_REORDER_BUFFER_WRITE, write, SnapshotData::xcnt, ReorderBufferTXN::xid, and SnapshotData::xip.

Referenced by ReorderBufferSerializeTXN().

2521 {
2522  ReorderBufferDiskChange *ondisk;
2523  Size sz = sizeof(ReorderBufferDiskChange);
2524 
2526 
2527  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2528  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
2529 
2530  switch (change->action)
2531  {
2532  /* fall through these, they're all similar enough */
2537  {
2538  char *data;
2539  ReorderBufferTupleBuf *oldtup,
2540  *newtup;
2541  Size oldlen = 0;
2542  Size newlen = 0;
2543 
2544  oldtup = change->data.tp.oldtuple;
2545  newtup = change->data.tp.newtuple;
2546 
2547  if (oldtup)
2548  {
2549  sz += sizeof(HeapTupleData);
2550  oldlen = oldtup->tuple.t_len;
2551  sz += oldlen;
2552  }
2553 
2554  if (newtup)
2555  {
2556  sz += sizeof(HeapTupleData);
2557  newlen = newtup->tuple.t_len;
2558  sz += newlen;
2559  }
2560 
2561  /* make sure we have enough space */
2563 
2564  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2565  /* might have been reallocated above */
2566  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2567 
2568  if (oldlen)
2569  {
2570  memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
2571  data += sizeof(HeapTupleData);
2572 
2573  memcpy(data, oldtup->tuple.t_data, oldlen);
2574  data += oldlen;
2575  }
2576 
2577  if (newlen)
2578  {
2579  memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
2580  data += sizeof(HeapTupleData);
2581 
2582  memcpy(data, newtup->tuple.t_data, newlen);
2583  data += newlen;
2584  }
2585  break;
2586  }
2588  {
2589  char *data;
2590  Size prefix_size = strlen(change->data.msg.prefix) + 1;
2591 
2592  sz += prefix_size + change->data.msg.message_size +
2593  sizeof(Size) + sizeof(Size);
2595 
2596  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2597 
2598  /* might have been reallocated above */
2599  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2600 
2601  /* write the prefix including the size */
2602  memcpy(data, &prefix_size, sizeof(Size));
2603  data += sizeof(Size);
2604  memcpy(data, change->data.msg.prefix,
2605  prefix_size);
2606  data += prefix_size;
2607 
2608  /* write the message including the size */
2609  memcpy(data, &change->data.msg.message_size, sizeof(Size));
2610  data += sizeof(Size);
2611  memcpy(data, change->data.msg.message,
2612  change->data.msg.message_size);
2613  data += change->data.msg.message_size;
2614 
2615  break;
2616  }
2618  {
2619  Snapshot snap;
2620  char *data;
2621 
2622  snap = change->data.snapshot;
2623 
2624  sz += sizeof(SnapshotData) +
2625  sizeof(TransactionId) * snap->xcnt +
2626  sizeof(TransactionId) * snap->subxcnt
2627  ;
2628 
2629  /* make sure we have enough space */
2631  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2632  /* might have been reallocated above */
2633  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2634 
2635  memcpy(data, snap, sizeof(SnapshotData));
2636  data += sizeof(SnapshotData);
2637 
2638  if (snap->xcnt)
2639  {
2640  memcpy(data, snap->xip,
2641  sizeof(TransactionId) * snap->xcnt);
2642  data += sizeof(TransactionId) * snap->xcnt;
2643  }
2644 
2645  if (snap->subxcnt)
2646  {
2647  memcpy(data, snap->subxip,
2648  sizeof(TransactionId) * snap->subxcnt);
2649  data += sizeof(TransactionId) * snap->subxcnt;
2650  }
2651  break;
2652  }
2654  {
2655  Size size;
2656  char *data;
2657 
2658  /* account for the OIDs of truncated relations */
2659  size = sizeof(Oid) * change->data.truncate.nrelids;
2660  sz += size;
2661 
2662  /* make sure we have enough space */
2664 
2665  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2666  /* might have been reallocated above */
2667  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2668 
2669  memcpy(data, change->data.truncate.relids, size);
2670  data += size;
2671 
2672  break;
2673  }
2677  /* ReorderBufferChange contains everything important */
2678  break;
2679  }
2680 
2681  ondisk->size = sz;
2682 
2683  errno = 0;
2685  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
2686  {
2687  int save_errno = errno;
2688 
2690 
2691  /* if write didn't set errno, assume problem is no disk space */
2692  errno = save_errno ? save_errno : ENOSPC;
2693  ereport(ERROR,
2695  errmsg("could not write to data file for XID %u: %m",
2696  txn->xid)));
2697  }
2699 
2700  Assert(ondisk->change.action == change->action);
2701 }
uint32 TransactionId
Definition: c.h:514
struct ReorderBufferChange::@101::@102 tp
#define write(a, b, c)
Definition: win32.h:14
struct ReorderBufferChange::@101::@104 msg
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
static int fd(const char *x, int i)
Definition: preproc-init.c:105
HeapTupleHeader t_data
Definition: htup.h:68
#define ERROR
Definition: elog.h:43
uint32 t_len
Definition: htup.h:64
int errcode_for_file_access(void)
Definition: elog.c:631
HeapTupleData tuple
Definition: reorderbuffer.h:29
struct SnapshotData SnapshotData
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1344
#define ereport(elevel, rest)
Definition: elog.h:141
TransactionId * xip
Definition: snapshot.h:168
int CloseTransientFile(int fd)
Definition: fd.c:2469
TransactionId xid
struct ReorderBufferDiskChange ReorderBufferDiskChange
#define Assert(condition)
Definition: c.h:739
union ReorderBufferChange::@101 data
size_t Size
Definition: c.h:467
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1320
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
uint32 xcnt
Definition: snapshot.h:169
int errmsg(const char *fmt,...)
Definition: elog.c:822
ReorderBufferChange change
struct HeapTupleData HeapTupleData
TransactionId * subxip
Definition: snapshot.h:180
struct ReorderBufferChange::@101::@103 truncate
int32 subxcnt
Definition: snapshot.h:181

◆ ReorderBufferSerializedPath()

static void ReorderBufferSerializedPath ( char *  path,
ReplicationSlot slot,
TransactionId  xid,
XLogSegNo  segno 
)
static

Definition at line 3148 of file reorderbuffer.c.

References ReplicationSlot::data, MAXPGPATH, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, snprintf, wal_segment_size, and XLogSegNoOffsetToRecPtr.

Referenced by ReorderBufferRestoreChanges(), ReorderBufferRestoreCleanup(), and ReorderBufferSerializeTXN().

3150 {
3151  XLogRecPtr recptr;
3152 
3153  XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
3154 
3155  snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.spill",
3157  xid,
3158  (uint32) (recptr >> 32), (uint32) recptr);
3159 }
int wal_segment_size
Definition: xlog.c:112
ReplicationSlotPersistentData data
Definition: slot.h:132
#define MAXPGPATH
unsigned int uint32
Definition: c.h:359
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define NameStr(name)
Definition: c.h:616
#define snprintf
Definition: port.h:192

◆ ReorderBufferSerializeReserve()

static void ReorderBufferSerializeReserve ( ReorderBuffer rb,
Size  sz 
)
static

Definition at line 2334 of file reorderbuffer.c.

References ReorderBuffer::context, MemoryContextAlloc(), ReorderBuffer::outbuf, ReorderBuffer::outbufsize, and repalloc().

Referenced by ReorderBufferRestoreChanges(), and ReorderBufferSerializeChange().

2335 {
2336  if (!rb->outbufsize)
2337  {
2338  rb->outbuf = MemoryContextAlloc(rb->context, sz);
2339  rb->outbufsize = sz;
2340  }
2341  else if (rb->outbufsize < sz)
2342  {
2343  rb->outbuf = repalloc(rb->outbuf, sz);
2344  rb->outbufsize = sz;
2345  }
2346 }
MemoryContext context
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1069
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796

◆ ReorderBufferSerializeTXN()

static void ReorderBufferSerializeTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2433 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, CloseTransientFile(), dlist_iter::cur, dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_delete(), dlist_foreach, dlist_foreach_modify, dlist_is_empty(), elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferChange::lsn, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, OpenTransientFile(), PG_BINARY, RBTXN_IS_SERIALIZED, rbtxn_is_serialized, ReorderBufferReturnChange(), ReorderBufferSerializeChange(), ReorderBufferSerializedPath(), ReorderBufferTXN::size, ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBufferTXN::subtxns, ReorderBufferTXN::txn_flags, wal_segment_size, ReorderBufferTXN::xid, XLByteInSeg, and XLByteToSeg.

Referenced by ReorderBufferCheckMemoryLimit(), and ReorderBufferIterTXNInit().

2434 {
2435  dlist_iter subtxn_i;
2436  dlist_mutable_iter change_i;
2437  int fd = -1;
2438  XLogSegNo curOpenSegNo = 0;
2439  Size spilled = 0;
2440  Size size = txn->size;
2441 
2442  elog(DEBUG2, "spill %u changes in XID %u to disk",
2443  (uint32) txn->nentries_mem, txn->xid);
2444 
2445  /* do the same to all child TXs */
2446  dlist_foreach(subtxn_i, &txn->subtxns)
2447  {
2448  ReorderBufferTXN *subtxn;
2449 
2450  subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
2451  ReorderBufferSerializeTXN(rb, subtxn);
2452  }
2453 
2454  /* serialize changestream */
2455  dlist_foreach_modify(change_i, &txn->changes)
2456  {
2457  ReorderBufferChange *change;
2458 
2459  change = dlist_container(ReorderBufferChange, node, change_i.cur);
2460 
2461  /*
2462  * store in segment in which it belongs by start lsn, don't split over
2463  * multiple segments tho
2464  */
2465  if (fd == -1 ||
2466  !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
2467  {
2468  char path[MAXPGPATH];
2469 
2470  if (fd != -1)
2471  CloseTransientFile(fd);
2472 
2473  XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
2474 
2475  /*
2476  * No need to care about TLIs here, only used during a single run,
2477  * so each LSN only maps to a specific WAL record.
2478  */
2480  curOpenSegNo);
2481 
2482  /* open segment, create it if necessary */
2483  fd = OpenTransientFile(path,
2484  O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
2485 
2486  if (fd < 0)
2487  ereport(ERROR,
2489  errmsg("could not open file \"%s\": %m", path)));
2490  }
2491 
2492  ReorderBufferSerializeChange(rb, txn, fd, change);
2493  dlist_delete(&change->node);
2494  ReorderBufferReturnChange(rb, change);
2495 
2496  spilled++;
2497  }
2498 
2499  /* update the statistics */
2500  rb->spillCount += 1;
2501  rb->spillBytes += size;
2502 
2503  /* Don't consider already serialized transactions. */
2504  rb->spillTxns += rbtxn_is_serialized(txn) ? 0 : 1;
2505 
2506  Assert(spilled == txn->nentries_mem);
2507  Assert(dlist_is_empty(&txn->changes));
2508  txn->nentries_mem = 0;
2510 
2511  if (fd != -1)
2512  CloseTransientFile(fd);
2513 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
dlist_node * cur
Definition: ilist.h:180
int wal_segment_size
Definition: xlog.c:112
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1221
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2292
dlist_head changes
#define MAXPGPATH
#define DEBUG2
Definition: elog.h:24
uint64 XLogSegNo
Definition: xlogdefs.h:41
int errcode_for_file_access(void)
Definition: elog.c:631
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
unsigned int uint32
Definition: c.h:359
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define ereport(elevel, rest)
Definition: elog.h:141
int CloseTransientFile(int fd)
Definition: fd.c:2469
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
dlist_node * cur
Definition: ilist.h:161
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:739
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
size_t Size
Definition: c.h:467
dlist_head subtxns
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define RBTXN_IS_SERIALIZED
#define elog(elevel,...)
Definition: elog.h:228
#define rbtxn_is_serialized(txn)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ ReorderBufferSetBaseSnapshot()

void ReorderBufferSetBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 2119 of file reorderbuffer.c.

References Assert, AssertArg, AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBufferTXN::base_snapshot_node, dlist_push_tail(), InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), ReorderBufferTXN::toplevel_xid, ReorderBufferTXNByIdEnt::txn, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by SnapBuildCommitTxn(), and SnapBuildProcessChange().

2121 {
2122  ReorderBufferTXN *txn;
2123  bool is_new;
2124 
2125  AssertArg(snap != NULL);
2126 
2127  /*
2128  * Fetch the transaction to operate on. If we know it's a subtransaction,
2129  * operate on its top-level transaction instead.
2130  */
2131  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
2132  if (rbtxn_is_known_subxact(txn))
2133  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
2134  NULL, InvalidXLogRecPtr, false);
2135  Assert(txn->base_snapshot == NULL);
2136 
2137  txn->base_snapshot = snap;
2138  txn->base_snapshot_lsn = lsn;
2140 
2141  AssertTXNLsnOrder(rb);
2142 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
dlist_node base_snapshot_node
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
XLogRecPtr base_snapshot_lsn
dlist_head txns_by_base_snapshot_lsn
#define AssertArg(condition)
Definition: c.h:741
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:739
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_known_subxact(txn)
TransactionId toplevel_xid

◆ ReorderBufferSetRestartPoint()

void ReorderBufferSetRestartPoint ( ReorderBuffer rb,
XLogRecPtr  ptr 
)

Definition at line 818 of file reorderbuffer.c.

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

819 {
821 }
XLogRecPtr current_restart_decoding_lsn

◆ ReorderBufferToastAppendChunk()

static void ReorderBufferToastAppendChunk ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 3221 of file reorderbuffer.c.

References Assert, ReorderBufferToastEnt::chunk_id, ReorderBufferToastEnt::chunks, ReorderBufferChange::data, DatumGetInt32, DatumGetObjectId, DatumGetPointer, dlist_init(), dlist_push_tail(), elog, ERROR, fastgetattr, HASH_ENTER, hash_search(), IsToastRelation(), ReorderBufferToastEnt::last_chunk_seq, ReorderBufferChange::node, ReorderBufferToastEnt::num_chunks, ReorderBufferToastEnt::reconstructed, RelationGetDescr, ReorderBufferToastInitHash(), ReorderBufferToastEnt::size, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, ReorderBufferTupleBuf::tuple, VARATT_IS_EXTENDED, VARATT_IS_SHORT, VARHDRSZ, VARHDRSZ_SHORT, VARSIZE, and VARSIZE_SHORT.

Referenced by ReorderBufferCommit().

3223 {
3224  ReorderBufferToastEnt *ent;
3225  ReorderBufferTupleBuf *newtup;
3226  bool found;
3227  int32 chunksize;
3228  bool isnull;
3229  Pointer chunk;
3230  TupleDesc desc = RelationGetDescr(relation);
3231  Oid chunk_id;
3232  int32 chunk_seq;
3233 
3234  if (txn->toast_hash == NULL)
3235  ReorderBufferToastInitHash(rb, txn);
3236 
3237  Assert(IsToastRelation(relation));
3238 
3239  newtup = change->data.tp.newtuple;
3240  chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
3241  Assert(!isnull);
3242  chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
3243  Assert(!isnull);
3244 
3245  ent = (ReorderBufferToastEnt *)
3246  hash_search(txn->toast_hash,
3247  (void *) &chunk_id,
3248  HASH_ENTER,
3249  &found);
3250 
3251  if (!found)
3252  {
3253  Assert(ent->chunk_id == chunk_id);
3254  ent->num_chunks = 0;
3255  ent->last_chunk_seq = 0;
3256  ent->size = 0;
3257  ent->reconstructed = NULL;
3258  dlist_init(&ent->chunks);
3259 
3260  if (chunk_seq != 0)
3261  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
3262  chunk_seq, chunk_id);
3263  }
3264  else if (found && chunk_seq != ent->last_chunk_seq + 1)
3265  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
3266  chunk_seq, chunk_id, ent->last_chunk_seq + 1);
3267 
3268  chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
3269  Assert(!isnull);
3270 
3271  /* calculate size so we can allocate the right size at once later */
3272  if (!VARATT_IS_EXTENDED(chunk))
3273  chunksize = VARSIZE(chunk) - VARHDRSZ;
3274  else if (VARATT_IS_SHORT(chunk))
3275  /* could happen due to heap_form_tuple doing its thing */
3276  chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
3277  else
3278  elog(ERROR, "unexpected type of toast chunk");
3279 
3280  ent->size += chunksize;
3281  ent->last_chunk_seq = chunk_seq;
3282  ent->num_chunks++;
3283  dlist_push_tail(&ent->chunks, &change->node);
3284 }
bool IsToastRelation(Relation relation)
Definition: catalog.c:141
#define fastgetattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:712
#define DatumGetInt32(X)
Definition: postgres.h:472
#define RelationGetDescr(relation)
Definition: rel.h:454
struct ReorderBufferChange::@101::@102 tp
#define VARHDRSZ_SHORT
Definition: postgres.h:268
#define VARSIZE(PTR)
Definition: postgres.h:303
#define VARHDRSZ
Definition: c.h:562
#define DatumGetObjectId(X)
Definition: postgres.h:500
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
unsigned int Oid
Definition: postgres_ext.h:31
signed int int32
Definition: c.h:347
char * Pointer
Definition: c.h:336
#define ERROR
Definition: elog.h:43
#define VARATT_IS_SHORT(PTR)
Definition: postgres.h:326
struct varlena * reconstructed
HeapTupleData tuple
Definition: reorderbuffer.h:29
#define VARSIZE_SHORT(PTR)
Definition: postgres.h:305
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
#define Assert(condition)
Definition: c.h:739
union ReorderBufferChange::@101 data
#define VARATT_IS_EXTENDED(PTR)
Definition: postgres.h:327
#define DatumGetPointer(X)
Definition: postgres.h:549
#define elog(elevel,...)
Definition: elog.h:228
static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)

◆ ReorderBufferToastInitHash()

static void ReorderBufferToastInitHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 3200 of file reorderbuffer.c.

References Assert, ReorderBuffer::context, HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, and ReorderBufferTXN::toast_hash.

Referenced by ReorderBufferToastAppendChunk().

3201 {
3202  HASHCTL hash_ctl;
3203 
3204  Assert(txn->toast_hash == NULL);
3205 
3206  memset(&hash_ctl, 0, sizeof(hash_ctl));
3207  hash_ctl.keysize = sizeof(Oid);
3208  hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
3209  hash_ctl.hcxt = rb->context;
3210  txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
3212 }
struct ReorderBufferToastEnt ReorderBufferToastEnt
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
Size entrysize
Definition: hsearch.h:73
unsigned int Oid
Definition: postgres_ext.h:31
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext context
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
Size keysize
Definition: hsearch.h:72
#define Assert(condition)
Definition: c.h:739

◆ ReorderBufferToastReplace()

static void ReorderBufferToastReplace ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 3307 of file reorderbuffer.c.

References Assert, ReorderBufferToastEnt::chunks, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, DatumGetPointer, dlist_container, dlist_foreach, elog, ERROR, fastgetattr, free, HASH_FIND, hash_search(), heap_deform_tuple(), heap_form_tuple(), INDIRECT_POINTER_SIZE, MaxHeapTupleSize, MemoryContextSwitchTo(), TupleDescData::natts, palloc0(), pfree(), varatt_indirect::pointer, PointerGetDatum, RelationData::rd_rel, ReorderBufferToastEnt::reconstructed, RelationClose(), RelationGetDescr, RelationIdGetRelation(), RelationIsValid, ReorderBufferChangeMemoryUpdate(), ReorderBufferTupleBufData, SET_VARSIZE, SET_VARSIZE_COMPRESSED, SET_VARTAG_EXTERNAL, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, ReorderBufferTupleBuf::tuple, TupleDescAttr, varatt_external::va_extsize, varatt_external::va_rawsize, varatt_external::va_valueid, VARATT_EXTERNAL_GET_POINTER, VARATT_EXTERNAL_IS_COMPRESSED, VARATT_IS_EXTERNAL, VARATT_IS_SHORT, VARDATA, VARDATA_EXTERNAL, VARHDRSZ, VARSIZE, and VARTAG_INDIRECT.

Referenced by ReorderBufferCommit().

3309 {
3310  TupleDesc desc;
3311  int natt;
3312  Datum *attrs;
3313  bool *isnull;
3314  bool *free;
3315  HeapTuple tmphtup;
3316  Relation toast_rel;
3317  TupleDesc toast_desc;
3318  MemoryContext oldcontext;
3319  ReorderBufferTupleBuf *newtup;
3320 
3321  /* no toast tuples changed */
3322  if (txn->toast_hash == NULL)
3323  return;
3324 
3325  /*
3326  * We're going to modify the size of the change, so to make sure the
3327  * accounting is correct we'll make it look like we're removing the change
3328  * now (with the old size), and then re-add it at the end.
3329  */
3330  ReorderBufferChangeMemoryUpdate(rb, change, false);
3331 
3332  oldcontext = MemoryContextSwitchTo(rb->context);
3333 
3334  /* we should only have toast tuples in an INSERT or UPDATE */
3335  Assert(change->data.tp.newtuple);
3336 
3337  desc = RelationGetDescr(relation);
3338 
3339  toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
3340  if (!RelationIsValid(toast_rel))
3341  elog(ERROR, "could not open relation with OID %u",
3342  relation->rd_rel->reltoastrelid);
3343 
3344  toast_desc = RelationGetDescr(toast_rel);
3345 
3346  /* should we allocate from stack instead? */
3347  attrs = palloc0(sizeof(Datum) * desc->natts);
3348  isnull = palloc0(sizeof(bool) * desc->natts);
3349  free = palloc0(sizeof(bool) * desc->natts);
3350 
3351  newtup = change->data.tp.newtuple;
3352 
3353  heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
3354 
3355  for (natt = 0; natt < desc->natts; natt++)
3356  {
3357  Form_pg_attribute attr = TupleDescAttr(desc, natt);
3358  ReorderBufferToastEnt *ent;
3359  struct varlena *varlena;
3360 
3361  /* va_rawsize is the size of the original datum -- including header */
3362  struct varatt_external toast_pointer;
3363  struct varatt_indirect redirect_pointer;
3364  struct varlena *new_datum = NULL;
3365  struct varlena *reconstructed;
3366  dlist_iter it;
3367  Size data_done = 0;
3368 
3369  /* system columns aren't toasted */
3370  if (attr->attnum < 0)
3371  continue;
3372 
3373  if (attr->attisdropped)
3374  continue;
3375 
3376  /* not a varlena datatype */
3377  if (attr->attlen != -1)
3378  continue;
3379 
3380  /* no data */
3381  if (isnull[natt])
3382  continue;
3383 
3384  /* ok, we know we have a toast datum */
3385  varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
3386 
3387  /* no need to do anything if the tuple isn't external */
3388  if (!VARATT_IS_EXTERNAL(varlena))
3389  continue;
3390 
3391  VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
3392 
3393  /*
3394  * Check whether the toast tuple changed, replace if so.
3395  */
3396  ent = (ReorderBufferToastEnt *)
3397  hash_search(txn->toast_hash,
3398  (void *) &toast_pointer.va_valueid,
3399  HASH_FIND,
3400  NULL);
3401  if (ent == NULL)
3402  continue;
3403 
3404  new_datum =
3405  (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
3406 
3407  free[natt] = true;
3408 
3409  reconstructed = palloc0(toast_pointer.va_rawsize);
3410 
3411  ent->reconstructed = reconstructed;
3412 
3413  /* stitch toast tuple back together from its parts */
3414  dlist_foreach(it, &ent->chunks)
3415  {
3416  bool isnull;
3417  ReorderBufferChange *cchange;
3418  ReorderBufferTupleBuf *ctup;
3419  Pointer chunk;
3420 
3421  cchange = dlist_container(ReorderBufferChange, node, it.cur);
3422  ctup = cchange->data.tp.newtuple;
3423  chunk = DatumGetPointer(
3424  fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
3425 
3426  Assert(!isnull);
3427  Assert(!VARATT_IS_EXTERNAL(chunk));
3428  Assert(!VARATT_IS_SHORT(chunk));
3429 
3430  memcpy(VARDATA(reconstructed) + data_done,
3431  VARDATA(chunk),
3432  VARSIZE(chunk) - VARHDRSZ);
3433  data_done += VARSIZE(chunk) - VARHDRSZ;
3434  }
3435  Assert(data_done == toast_pointer.va_extsize);
3436 
3437  /* make sure its marked as compressed or not */
3438  if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
3439  SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
3440  else
3441  SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
3442 
3443  memset(&redirect_pointer, 0, sizeof(redirect_pointer));
3444  redirect_pointer.pointer = reconstructed;
3445 
3447  memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
3448  sizeof(redirect_pointer));
3449 
3450  attrs[natt] = PointerGetDatum(new_datum);
3451  }
3452 
3453  /*
3454  * Build tuple in separate memory & copy tuple back into the tuplebuf
3455  * passed to the output plugin. We can't directly heap_fill_tuple() into
3456  * the tuplebuf because attrs[] will point back into the current content.
3457  */
3458  tmphtup = heap_form_tuple(desc, attrs, isnull);
3459  Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
3460  Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
3461 
3462  memcpy(newtup->tu