PostgreSQL Source Code  git master
reorderbuffer.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/detoast.h"
#include "access/heapam.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/combocid.h"
#include "utils/memdebug.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenodemap.h"
Include dependency graph for reorderbuffer.c:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTXNByIdEnt
 
struct  ReorderBufferTupleCidKey
 
struct  ReorderBufferTupleCidEnt
 
struct  TXNEntryFile
 
struct  ReorderBufferIterTXNEntry
 
struct  ReorderBufferIterTXNState
 
struct  ReorderBufferToastEnt
 
struct  ReorderBufferDiskChange
 
struct  RewriteMappingFile
 

Typedefs

typedef struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
 
typedef struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
 
typedef struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
 
typedef struct TXNEntryFile TXNEntryFile
 
typedef struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
 
typedef struct ReorderBufferIterTXNState ReorderBufferIterTXNState
 
typedef struct ReorderBufferToastEnt ReorderBufferToastEnt
 
typedef struct ReorderBufferDiskChange ReorderBufferDiskChange
 
typedef struct RewriteMappingFile RewriteMappingFile
 

Functions

static ReorderBufferTXNReorderBufferGetTXN (ReorderBuffer *rb)
 
static void ReorderBufferReturnTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferTXNReorderBufferTXNByXid (ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
 
static void ReorderBufferTransferSnapToParent (ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
 
static void AssertTXNLsnOrder (ReorderBuffer *rb)
 
static void ReorderBufferIterTXNInit (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
 
static ReorderBufferChangeReorderBufferIterTXNNext (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferIterTXNFinish (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferExecuteInvalidations (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferCheckMemoryLimit (ReorderBuffer *rb)
 
static void ReorderBufferSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeChange (ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
 
static Size ReorderBufferRestoreChanges (ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
 
static void ReorderBufferRestoreChange (ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
 
static void ReorderBufferRestoreCleanup (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferCleanupSerializedTXNs (const char *slotname)
 
static void ReorderBufferSerializedPath (char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
 
static void ReorderBufferFreeSnap (ReorderBuffer *rb, Snapshot snap)
 
static Snapshot ReorderBufferCopySnap (ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
 
static void ReorderBufferToastInitHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReset (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReplace (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void ReorderBufferToastAppendChunk (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static Size ReorderBufferChangeSize (ReorderBufferChange *change)
 
static void ReorderBufferChangeMemoryUpdate (ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
 
ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *rb)
 
void ReorderBufferReturnChange (ReorderBuffer *rb, ReorderBufferChange *change)
 
ReorderBufferTupleBufReorderBufferGetTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
 
OidReorderBufferGetRelids (ReorderBuffer *rb, int nrelids)
 
void ReorderBufferReturnRelids (ReorderBuffer *rb, Oid *relids)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
TransactionId ReorderBufferGetOldestXmin (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
static int ReorderBufferIterCompare (Datum a, Datum b, void *arg)
 
static void ReorderBufferCleanupTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferBuildTupleCidHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
static void ReorderBufferSerializeReserve (ReorderBuffer *rb, Size sz)
 
static ReorderBufferTXNReorderBufferLargestTXN (ReorderBuffer *rb)
 
void StartupReorderBuffer (void)
 
static void ApplyLogicalMappingFile (HTAB *tuplecid_data, Oid relid, const char *fname)
 
static bool TransactionIdInArray (TransactionId xid, TransactionId *xip, Size num)
 
static int file_sort_by_lsn (const ListCell *a_p, const ListCell *b_p)
 
static void UpdateLogicalMappings (HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
 
bool ResolveCminCmaxDuringDecoding (HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
 

Variables

int logical_decoding_work_mem
 
static const Size max_changes_in_memory = 4096
 

Typedef Documentation

◆ ReorderBufferDiskChange

◆ ReorderBufferIterTXNEntry

◆ ReorderBufferIterTXNState

◆ ReorderBufferToastEnt

◆ ReorderBufferTupleCidEnt

◆ ReorderBufferTupleCidKey

◆ ReorderBufferTXNByIdEnt

◆ RewriteMappingFile

◆ TXNEntryFile

typedef struct TXNEntryFile TXNEntryFile

Function Documentation

◆ ApplyLogicalMappingFile()

static void ApplyLogicalMappingFile ( HTAB tuplecid_data,
Oid  relid,
const char *  fname 
)
static

Definition at line 3579 of file reorderbuffer.c.

References Assert, CloseTransientFile(), ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), HASH_ENTER, HASH_FIND, hash_search(), InvalidCommandId, ItemPointerCopy, sort-test::key, MAXPGPATH, LogicalRewriteMappingData::new_node, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_node, LogicalRewriteMappingData::old_tid, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, ReorderBufferTupleCidKey::relnode, sprintf, ReorderBufferTupleCidKey::tid, and WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ.

Referenced by UpdateLogicalMappings().

3580 {
3581  char path[MAXPGPATH];
3582  int fd;
3583  int readBytes;
3585 
3586  sprintf(path, "pg_logical/mappings/%s", fname);
3587  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
3588  if (fd < 0)
3589  ereport(ERROR,
3591  errmsg("could not open file \"%s\": %m", path)));
3592 
3593  while (true)
3594  {
3597  ReorderBufferTupleCidEnt *new_ent;
3598  bool found;
3599 
3600  /* be careful about padding */
3601  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
3602 
3603  /* read all mappings till the end of the file */
3605  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
3607 
3608  if (readBytes < 0)
3609  ereport(ERROR,
3611  errmsg("could not read file \"%s\": %m",
3612  path)));
3613  else if (readBytes == 0) /* EOF */
3614  break;
3615  else if (readBytes != sizeof(LogicalRewriteMappingData))
3616  ereport(ERROR,
3618  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
3619  path, readBytes,
3620  (int32) sizeof(LogicalRewriteMappingData))));
3621 
3622  key.relnode = map.old_node;
3623  ItemPointerCopy(&map.old_tid,
3624  &key.tid);
3625 
3626 
3627  ent = (ReorderBufferTupleCidEnt *)
3628  hash_search(tuplecid_data,
3629  (void *) &key,
3630  HASH_FIND,
3631  NULL);
3632 
3633  /* no existing mapping, no need to update */
3634  if (!ent)
3635  continue;
3636 
3637  key.relnode = map.new_node;
3638  ItemPointerCopy(&map.new_tid,
3639  &key.tid);
3640 
3641  new_ent = (ReorderBufferTupleCidEnt *)
3642  hash_search(tuplecid_data,
3643  (void *) &key,
3644  HASH_ENTER,
3645  &found);
3646 
3647  if (found)
3648  {
3649  /*
3650  * Make sure the existing mapping makes sense. We sometime update
3651  * old records that did not yet have a cmax (e.g. pg_class' own
3652  * entry while rewriting it) during rewrites, so allow that.
3653  */
3654  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
3655  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
3656  }
3657  else
3658  {
3659  /* update mapping */
3660  new_ent->cmin = ent->cmin;
3661  new_ent->cmax = ent->cmax;
3662  new_ent->combocid = ent->combocid;
3663  }
3664  }
3665 
3666  if (CloseTransientFile(fd) != 0)
3667  ereport(ERROR,
3669  errmsg("could not close file \"%s\": %m", path)));
3670 }
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:908
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1233
signed int int32
Definition: c.h:355
#define sprintf
Definition: port.h:195
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2372
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:633
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1381
int CloseTransientFile(int fd)
Definition: fd.c:2549
#define InvalidCommandId
Definition: c.h:530
#define ereport(elevel,...)
Definition: elog.h:144
ItemPointerData new_tid
Definition: rewriteheap.h:40
#define Assert(condition)
Definition: c.h:738
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1357
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define read(a, b, c)
Definition: win32.h:13
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:161
ItemPointerData old_tid
Definition: rewriteheap.h:39

◆ AssertTXNLsnOrder()

static void AssertTXNLsnOrder ( ReorderBuffer rb)
static

Definition at line 719 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferAssignChild(), ReorderBufferGetOldestTXN(), ReorderBufferGetOldestXmin(), ReorderBufferSetBaseSnapshot(), and ReorderBufferTXNByXid().

720 {
721 #ifdef USE_ASSERT_CHECKING
722  dlist_iter iter;
723  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
724  XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
725 
726  dlist_foreach(iter, &rb->toplevel_by_lsn)
727  {
729  iter.cur);
730 
731  /* start LSN must be set */
732  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
733 
734  /* If there is an end LSN, it must be higher than start LSN */
735  if (cur_txn->end_lsn != InvalidXLogRecPtr)
736  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
737 
738  /* Current initial LSN must be strictly higher than previous */
739  if (prev_first_lsn != InvalidXLogRecPtr)
740  Assert(prev_first_lsn < cur_txn->first_lsn);
741 
742  /* known-as-subtxn txns must not be listed */
743  Assert(!rbtxn_is_known_subxact(cur_txn));
744 
745  prev_first_lsn = cur_txn->first_lsn;
746  }
747 
749  {
751  base_snapshot_node,
752  iter.cur);
753 
754  /* base snapshot (and its LSN) must be set */
755  Assert(cur_txn->base_snapshot != NULL);
757 
758  /* current LSN must be strictly higher than previous */
759  if (prev_base_snap_lsn != InvalidXLogRecPtr)
760  Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
761 
762  /* known-as-subtxn txns must not be listed */
763  Assert(!rbtxn_is_known_subxact(cur_txn));
764 
765  prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
766  }
767 #endif
768 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
XLogRecPtr base_snapshot_lsn
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head txns_by_base_snapshot_lsn
dlist_head toplevel_by_lsn
dlist_node * cur
Definition: ilist.h:161
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:738
XLogRecPtr end_lsn
#define rbtxn_is_known_subxact(txn)

◆ file_sort_by_lsn()

static int file_sort_by_lsn ( const ListCell a_p,
const ListCell b_p 
)
static

Definition at line 3687 of file reorderbuffer.c.

References lfirst, and RewriteMappingFile::lsn.

Referenced by UpdateLogicalMappings().

3688 {
3691 
3692  if (a->lsn < b->lsn)
3693  return -1;
3694  else if (a->lsn > b->lsn)
3695  return 1;
3696  return 0;
3697 }
#define lfirst(lc)
Definition: pg_list.h:190

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1924 of file reorderbuffer.c.

References ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferCleanupTXN(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeAbort().

1925 {
1926  ReorderBufferTXN *txn;
1927 
1928  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1929  false);
1930 
1931  /* unknown, nothing to remove */
1932  if (txn == NULL)
1933  return;
1934 
1935  /* cosmetic... */
1936  txn->final_lsn = lsn;
1937 
1938  /* remove potential on-disk data, and deallocate */
1939  ReorderBufferCleanupTXN(rb, txn);
1940 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)

Definition at line 1950 of file reorderbuffer.c.

References dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, elog, ReorderBufferCleanupTXN(), ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::xid.

Referenced by DecodeStandbyOp().

1951 {
1952  dlist_mutable_iter it;
1953 
1954  /*
1955  * Iterate through all (potential) toplevel TXNs and abort all that are
1956  * older than what possibly can be running. Once we've found the first
1957  * that is alive we stop, there might be some that acquired an xid earlier
1958  * but started writing later, but it's unlikely and they will be cleaned
1959  * up in a later call to this function.
1960  */
1962  {
1963  ReorderBufferTXN *txn;
1964 
1965  txn = dlist_container(ReorderBufferTXN, node, it.cur);
1966 
1967  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1968  {
1969  elog(DEBUG2, "aborting old transaction %u", txn->xid);
1970 
1971  /* remove potential on-disk data, and deallocate this tx */
1972  ReorderBufferCleanupTXN(rb, txn);
1973  }
1974  else
1975  return;
1976  }
1977 }
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define DEBUG2
Definition: elog.h:24
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
dlist_head toplevel_by_lsn
TransactionId xid
#define elog(elevel,...)
Definition: elog.h:214

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 2211 of file reorderbuffer.c.

References Assert, ReorderBuffer::context, elog, ERROR, ReorderBufferTXN::invalidations, MemoryContextAlloc(), ReorderBufferTXN::ninvalidations, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

2214 {
2215  ReorderBufferTXN *txn;
2216 
2217  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2218 
2219  if (txn->ninvalidations != 0)
2220  elog(ERROR, "only ever add one set of invalidations");
2221 
2222  Assert(nmsgs > 0);
2223 
2224  txn->ninvalidations = nmsgs;
2227  sizeof(SharedInvalidationMessage) * nmsgs);
2228  memcpy(txn->invalidations, msgs,
2229  sizeof(SharedInvalidationMessage) * nmsgs);
2230 }
#define ERROR
Definition: elog.h:43
MemoryContext context
#define Assert(condition)
Definition: c.h:738
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
#define elog(elevel,...)
Definition: elog.h:214

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

Definition at line 2127 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::command_id, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, ReorderBufferGetChange(), and ReorderBufferQueueChange().

Referenced by SnapBuildProcessNewCid().

2129 {
2131 
2132  change->data.command_id = cid;
2134 
2135  ReorderBufferQueueChange(rb, xid, lsn, change);
2136 }
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
union ReorderBufferChange::@99 data
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileNode  node,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 2182 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessNewCid().

2186 {
2188  ReorderBufferTXN *txn;
2189 
2190  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2191 
2192  change->data.tuplecid.node = node;
2193  change->data.tuplecid.tid = tid;
2194  change->data.tuplecid.cmin = cmin;
2195  change->data.tuplecid.cmax = cmax;
2196  change->data.tuplecid.combocid = combocid;
2197  change->lsn = lsn;
2198  change->txn = txn;
2200 
2201  dlist_push_tail(&txn->tuplecids, &change->node);
2202  txn->ntuplecids++;
2203 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:86
struct ReorderBufferChange::@99::@103 tuplecid
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
union ReorderBufferChange::@99 data
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
dlist_head tuplecids

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 2078 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, ReorderBufferGetChange(), ReorderBufferQueueChange(), and ReorderBufferChange::snapshot.

Referenced by SnapBuildDistributeNewCatalogSnapshot().

2080 {
2082 
2083  change->data.snapshot = snap;
2085 
2086  ReorderBufferQueueChange(rb, xid, lsn, change);
2087 }
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
union ReorderBufferChange::@99 data
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)

◆ ReorderBufferAllocate()

ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 272 of file reorderbuffer.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, ReorderBufferCleanupSerializedTXNs(), ReorderBuffer::size, SLAB_DEFAULT_BLOCK_SIZE, SLAB_LARGE_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBuffer::toplevel_by_lsn, ReorderBuffer::tup_context, ReorderBuffer::txn_context, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

273 {
274  ReorderBuffer *buffer;
275  HASHCTL hash_ctl;
276  MemoryContext new_ctx;
277 
278  Assert(MyReplicationSlot != NULL);
279 
280  /* allocate memory in own context, to have better accountability */
282  "ReorderBuffer",
284 
285  buffer =
286  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
287 
288  memset(&hash_ctl, 0, sizeof(hash_ctl));
289 
290  buffer->context = new_ctx;
291 
292  buffer->change_context = SlabContextCreate(new_ctx,
293  "Change",
295  sizeof(ReorderBufferChange));
296 
297  buffer->txn_context = SlabContextCreate(new_ctx,
298  "TXN",
300  sizeof(ReorderBufferTXN));
301 
302  buffer->tup_context = GenerationContextCreate(new_ctx,
303  "Tuples",
305 
306  hash_ctl.keysize = sizeof(TransactionId);
307  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
308  hash_ctl.hcxt = buffer->context;
309 
310  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
312 
314  buffer->by_txn_last_txn = NULL;
315 
316  buffer->outbuf = NULL;
317  buffer->outbufsize = 0;
318  buffer->size = 0;
319 
320  buffer->spillCount = 0;
321  buffer->spillTxns = 0;
322  buffer->spillBytes = 0;
323 
325 
326  dlist_init(&buffer->toplevel_by_lsn);
328 
329  /*
330  * Ensure there's no stale data from prior uses of this slot, in case some
331  * prior exit avoided calling ReorderBufferFree. Failure to do this can
332  * produce duplicated txns, and it's very cheap if there's nothing there.
333  */
335 
336  return buffer;
337 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define AllocSetContextCreate
Definition: memutils.h:170
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
uint32 TransactionId
Definition: c.h:513
MemoryContext hcxt
Definition: hsearch.h:78
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
Size entrysize
Definition: hsearch.h:73
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:174
ReplicationSlotPersistentData data
Definition: slot.h:143
MemoryContext change_context
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:222
dlist_head txns_by_base_snapshot_lsn
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
#define InvalidTransactionId
Definition: transam.h:31
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size blockSize)
Definition: generation.c:196
MemoryContext context
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:318
ReorderBufferTXN * by_txn_last_txn
dlist_head toplevel_by_lsn
Size keysize
Definition: hsearch.h:72
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:738
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:221
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
#define NameStr(name)
Definition: c.h:615
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
MemoryContext tup_context
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext txn_context

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 830 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SUBXACT, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, ReorderBufferTXN::toplevel_xid, ReorderBufferTXNByIdEnt::txn, ReorderBufferTXN::txn_flags, and ReorderBufferTXNByIdEnt::xid.

Referenced by DecodeXactOp(), and ReorderBufferCommitChild().

832 {
833  ReorderBufferTXN *txn;
834  ReorderBufferTXN *subtxn;
835  bool new_top;
836  bool new_sub;
837 
838  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
839  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
840 
841  if (!new_sub)
842  {
843  if (rbtxn_is_known_subxact(subtxn))
844  {
845  /* already associated, nothing to do */
846  return;
847  }
848  else
849  {
850  /*
851  * We already saw this transaction, but initially added it to the
852  * list of top-level txns. Now that we know it's not top-level,
853  * remove it from there.
854  */
855  dlist_delete(&subtxn->node);
856  }
857  }
858 
859  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
860  subtxn->toplevel_xid = xid;
861  Assert(subtxn->nsubtxns == 0);
862 
863  /* add to subtransaction list */
864  dlist_push_tail(&txn->subtxns, &subtxn->node);
865  txn->nsubtxns++;
866 
867  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
869 
870  /* Verify LSN-ordering invariant */
871  AssertTXNLsnOrder(rb);
872 }
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
#define RBTXN_IS_SUBXACT
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:738
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_known_subxact(txn)
TransactionId toplevel_xid

◆ ReorderBufferBuildTupleCidHash()

static void ReorderBufferBuildTupleCidHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1343 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, dlist_container, dlist_foreach, dlist_is_empty(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FIND, hash_search(), HASHCTL::hcxt, InvalidCommandId, ItemPointerCopy, sort-test::key, HASHCTL::keysize, ReorderBufferTXN::ntuplecids, rbtxn_has_catalog_changes, ReorderBufferTupleCidKey::relnode, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferTupleCidKey::tid, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::tuplecids.

Referenced by ReorderBufferCommit().

1344 {
1345  dlist_iter iter;
1346  HASHCTL hash_ctl;
1347 
1349  return;
1350 
1351  memset(&hash_ctl, 0, sizeof(hash_ctl));
1352 
1353  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1354  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1355  hash_ctl.hcxt = rb->context;
1356 
1357  /*
1358  * create the hash with the exact number of to-be-stored tuplecids from
1359  * the start
1360  */
1361  txn->tuplecid_hash =
1362  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1364 
1365  dlist_foreach(iter, &txn->tuplecids)
1366  {
1369  bool found;
1370  ReorderBufferChange *change;
1371 
1372  change = dlist_container(ReorderBufferChange, node, iter.cur);
1373 
1375 
1376  /* be careful about padding */
1377  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1378 
1379  key.relnode = change->data.tuplecid.node;
1380 
1381  ItemPointerCopy(&change->data.tuplecid.tid,
1382  &key.tid);
1383 
1384  ent = (ReorderBufferTupleCidEnt *)
1386  (void *) &key,
1388  &found);
1389  if (!found)
1390  {
1391  ent->cmin = change->data.tuplecid.cmin;
1392  ent->cmax = change->data.tuplecid.cmax;
1393  ent->combocid = change->data.tuplecid.combocid;
1394  }
1395  else
1396  {
1397  /*
1398  * Maybe we already saw this tuple before in this transaction, but
1399  * if so it must have the same cmin.
1400  */
1401  Assert(ent->cmin == change->data.tuplecid.cmin);
1402 
1403  /*
1404  * cmax may be initially invalid, but once set it can only grow,
1405  * and never become invalid again.
1406  */
1407  Assert((ent->cmax == InvalidCommandId) ||
1408  ((change->data.tuplecid.cmax != InvalidCommandId) &&
1409  (change->data.tuplecid.cmax > ent->cmax)));
1410  ent->cmax = change->data.tuplecid.cmax;
1411  }
1412  }
1413 }
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
struct ReorderBufferChange::@99::@103 tuplecid
Size entrysize
Definition: hsearch.h:73
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:908
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext context
#define InvalidCommandId
Definition: c.h:530
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:318
union ReorderBufferChange::@99 data
Size keysize
Definition: hsearch.h:72
dlist_node * cur
Definition: ilist.h:161
#define Assert(condition)
Definition: c.h:738
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define rbtxn_has_catalog_changes(txn)
dlist_head tuplecids
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:161

◆ ReorderBufferChangeMemoryUpdate()

static void ReorderBufferChangeMemoryUpdate ( ReorderBuffer rb,
ReorderBufferChange change,
bool  addition 
)
static

Definition at line 2143 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferChangeSize(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBufferChange::txn.

Referenced by ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferReturnChange(), and ReorderBufferToastReplace().

2146 {
2147  Size sz;
2148 
2149  Assert(change->txn);
2150 
2151  /*
2152  * Ignore tuple CID changes, because those are not evicted when reaching
2153  * memory limit. So we just don't count them, because it might easily
2154  * trigger a pointless attempt to spill.
2155  */
2157  return;
2158 
2159  sz = ReorderBufferChangeSize(change);
2160 
2161  if (addition)
2162  {
2163  change->txn->size += sz;
2164  rb->size += sz;
2165  }
2166  else
2167  {
2168  Assert((rb->size >= sz) && (change->txn->size >= sz));
2169  change->txn->size -= sz;
2170  rb->size -= sz;
2171  }
2172 }
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:86
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
#define Assert(condition)
Definition: c.h:738
size_t Size
Definition: c.h:466

◆ ReorderBufferChangeSize()

static Size ReorderBufferChangeSize ( ReorderBufferChange change)
static

Definition at line 2698 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::msg, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::snapshot, SnapshotData::subxcnt, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTupleBuf::tuple, and SnapshotData::xcnt.

Referenced by ReorderBufferChangeMemoryUpdate().

2699 {
2700  Size sz = sizeof(ReorderBufferChange);
2701 
2702  switch (change->action)
2703  {
2704  /* fall through these, they're all similar enough */
2709  {
2710  ReorderBufferTupleBuf *oldtup,
2711  *newtup;
2712  Size oldlen = 0;
2713  Size newlen = 0;
2714 
2715  oldtup = change->data.tp.oldtuple;
2716  newtup = change->data.tp.newtuple;
2717 
2718  if (oldtup)
2719  {
2720  sz += sizeof(HeapTupleData);
2721  oldlen = oldtup->tuple.t_len;
2722  sz += oldlen;
2723  }
2724 
2725  if (newtup)
2726  {
2727  sz += sizeof(HeapTupleData);
2728  newlen = newtup->tuple.t_len;
2729  sz += newlen;
2730  }
2731 
2732  break;
2733  }
2735  {
2736  Size prefix_size = strlen(change->data.msg.prefix) + 1;
2737 
2738  sz += prefix_size + change->data.msg.message_size +
2739  sizeof(Size) + sizeof(Size);
2740 
2741  break;
2742  }
2744  {
2745  Snapshot snap;
2746 
2747  snap = change->data.snapshot;
2748 
2749  sz += sizeof(SnapshotData) +
2750  sizeof(TransactionId) * snap->xcnt +
2751  sizeof(TransactionId) * snap->subxcnt;
2752 
2753  break;
2754  }
2756  {
2757  sz += sizeof(Oid) * change->data.truncate.nrelids;
2758 
2759  break;
2760  }
2764  /* ReorderBufferChange contains everything important */
2765  break;
2766  }
2767 
2768  return sz;
2769 }
struct ReorderBufferChange::@99::@101 truncate
uint32 TransactionId
Definition: c.h:513
struct ReorderBufferChange::@99::@102 msg
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
uint32 t_len
Definition: htup.h:64
HeapTupleData tuple
Definition: reorderbuffer.h:29
struct SnapshotData SnapshotData
union ReorderBufferChange::@99 data
struct ReorderBufferChange::@99::@100 tp
size_t Size
Definition: c.h:466
uint32 xcnt
Definition: snapshot.h:169
struct HeapTupleData HeapTupleData
struct ReorderBufferChange ReorderBufferChange
int32 subxcnt
Definition: snapshot.h:181

◆ ReorderBufferCheckMemoryLimit()

static void ReorderBufferCheckMemoryLimit ( ReorderBuffer rb)
static

Definition at line 2371 of file reorderbuffer.c.

References Assert, logical_decoding_work_mem, ReorderBufferTXN::nentries_mem, ReorderBufferLargestTXN(), ReorderBufferSerializeTXN(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferQueueChange().

2372 {
2373  ReorderBufferTXN *txn;
2374 
2375  /* bail out if we haven't exceeded the memory limit */
2376  if (rb->size < logical_decoding_work_mem * 1024L)
2377  return;
2378 
2379  /*
2380  * Loop until we reach under the memory limit. One might think that just
2381  * by evicting the largest (sub)transaction we will come under the memory
2382  * limit based on assumption that the selected transaction is at least as
2383  * large as the most recent change (which caused us to go over the memory
2384  * limit). However, that is not true because a user can reduce the
2385  * logical_decoding_work_mem to a smaller value before the most recent
2386  * change.
2387  */
2388  while (rb->size >= logical_decoding_work_mem * 1024L)
2389  {
2390  /*
2391  * Pick the largest transaction (or subtransaction) and evict it from
2392  * memory by serializing it to disk.
2393  */
2394  txn = ReorderBufferLargestTXN(rb);
2395 
2396  ReorderBufferSerializeTXN(rb, txn);
2397 
2398  /*
2399  * After eviction, the transaction should have no entries in memory,
2400  * and should use 0 bytes for changes.
2401  */
2402  Assert(txn->size == 0);
2403  Assert(txn->nentries_mem == 0);
2404  }
2405 
2406  /* We must be under the memory limit now. */
2407  Assert(rb->size < logical_decoding_work_mem * 1024L);
2408 }
static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:738
int logical_decoding_work_mem

◆ ReorderBufferCleanupSerializedTXNs()

static void ReorderBufferCleanupSerializedTXNs ( const char *  slotname)
static

Definition at line 3100 of file reorderbuffer.c.

References AllocateDir(), dirent::d_name, ereport, errcode_for_file_access(), errmsg(), ERROR, FreeDir(), INFO, lstat, MAXPGPATH, ReadDirExtended(), S_ISDIR, snprintf, sprintf, and stat.

Referenced by ReorderBufferAllocate(), ReorderBufferFree(), and StartupReorderBuffer().

3101 {
3102  DIR *spill_dir;
3103  struct dirent *spill_de;
3104  struct stat statbuf;
3105  char path[MAXPGPATH * 2 + 12];
3106 
3107  sprintf(path, "pg_replslot/%s", slotname);
3108 
3109  /* we're only handling directories here, skip if it's not ours */
3110  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
3111  return;
3112 
3113  spill_dir = AllocateDir(path);
3114  while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
3115  {
3116  /* only look at names that can be ours */
3117  if (strncmp(spill_de->d_name, "xid", 3) == 0)
3118  {
3119  snprintf(path, sizeof(path),
3120  "pg_replslot/%s/%s", slotname,
3121  spill_de->d_name);
3122 
3123  if (unlink(path) != 0)
3124  ereport(ERROR,
3126  errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
3127  path, slotname)));
3128  }
3129  }
3130  FreeDir(spill_dir);
3131 }
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2664
#define INFO
Definition: elog.h:33
Definition: dirent.h:9
#define sprintf
Definition: port.h:195
Definition: dirent.c:25
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:633
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2583
#define stat(a, b)
Definition: win32_port.h:255
#define ereport(elevel,...)
Definition: elog.h:144
#define S_ISDIR(m)
Definition: win32_port.h:296
#define lstat(path, sb)
Definition: win32_port.h:244
int errmsg(const char *fmt,...)
Definition: elog.c:824
char d_name[MAX_PATH]
Definition: dirent.h:14
#define snprintf
Definition: port.h:193
int FreeDir(DIR *dir)
Definition: fd.c:2701

◆ ReorderBufferCleanupTXN()

static void ReorderBufferCleanupTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1251 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_node, ReorderBuffer::by_txn, ReorderBufferTXN::changes, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, HASH_REMOVE, hash_search(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, rbtxn_is_serialized, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferRestoreCleanup(), ReorderBufferReturnChange(), ReorderBufferReturnTXN(), SnapBuildSnapDecRefcount(), ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAbortOld(), ReorderBufferCommit(), and ReorderBufferForget().

1252 {
1253  bool found;
1254  dlist_mutable_iter iter;
1255 
1256  /* cleanup subtransactions & their changes */
1257  dlist_foreach_modify(iter, &txn->subtxns)
1258  {
1259  ReorderBufferTXN *subtxn;
1260 
1261  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1262 
1263  /*
1264  * Subtransactions are always associated to the toplevel TXN, even if
1265  * they originally were happening inside another subtxn, so we won't
1266  * ever recurse more than one level deep here.
1267  */
1268  Assert(rbtxn_is_known_subxact(subtxn));
1269  Assert(subtxn->nsubtxns == 0);
1270 
1271  ReorderBufferCleanupTXN(rb, subtxn);
1272  }
1273 
1274  /* cleanup changes in the toplevel txn */
1275  dlist_foreach_modify(iter, &txn->changes)
1276  {
1277  ReorderBufferChange *change;
1278 
1279  change = dlist_container(ReorderBufferChange, node, iter.cur);
1280 
1281  /* Check we're not mixing changes from different transactions. */
1282  Assert(change->txn == txn);
1283 
1284  ReorderBufferReturnChange(rb, change);
1285  }
1286 
1287  /*
1288  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1289  * They are always stored in the toplevel transaction.
1290  */
1291  dlist_foreach_modify(iter, &txn->tuplecids)
1292  {
1293  ReorderBufferChange *change;
1294 
1295  change = dlist_container(ReorderBufferChange, node, iter.cur);
1296 
1297  /* Check we're not mixing changes from different transactions. */
1298  Assert(change->txn == txn);
1300 
1301  ReorderBufferReturnChange(rb, change);
1302  }
1303 
1304  /*
1305  * Cleanup the base snapshot, if set.
1306  */
1307  if (txn->base_snapshot != NULL)
1308  {
1311  }
1312 
1313  /*
1314  * Remove TXN from its containing list.
1315  *
1316  * Note: if txn is known as subxact, we are deleting the TXN from its
1317  * parent's list of known subxacts; this leaves the parent's nsubxacts
1318  * count too high, but we don't care. Otherwise, we are deleting the TXN
1319  * from the LSN-ordered list of toplevel TXNs.
1320  */
1321  dlist_delete(&txn->node);
1322 
1323  /* now remove reference from buffer */
1324  hash_search(rb->by_txn,
1325  (void *) &txn->xid,
1326  HASH_REMOVE,
1327  &found);
1328  Assert(found);
1329 
1330  /* remove entries spilled to disk */
1331  if (rbtxn_is_serialized(txn))
1332  ReorderBufferRestoreCleanup(rb, txn);
1333 
1334  /* deallocate */
1335  ReorderBufferReturnTXN(rb, txn);
1336 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
Snapshot base_snapshot
dlist_node base_snapshot_node
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:86
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:908
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
#define Assert(condition)
Definition: c.h:738
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:428
dlist_head subtxns
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define rbtxn_is_serialized(txn)
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define rbtxn_is_known_subxact(txn)
dlist_head tuplecids

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 1502 of file reorderbuffer.c.

References AbortCurrentTransaction(), ReorderBufferChange::action, ReorderBuffer::apply_change, ReorderBuffer::apply_truncate, Assert, ReorderBufferTXN::base_snapshot, ReorderBuffer::begin, BeginInternalSubTransaction(), ReorderBufferChange::command_id, ReorderBuffer::commit, ReorderBufferTXN::commit_time, SnapshotData::copied, SnapshotData::curcid, ReorderBufferChange::data, dlist_delete(), elog, ReorderBufferTXN::end_lsn, ERROR, ReorderBufferTXN::final_lsn, FirstCommandId, GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), i, InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, ReorderBuffer::message, ReorderBufferChange::msg, ReorderBufferTXN::ninvalidations, ReorderBufferChange::node, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBuffer::output_rewrites, palloc0(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelationIsValid, RelidByRelfilenode(), relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferReturnChange(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTXNByXid(), RollbackAndReleaseCurrentSubTransaction(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, StartTransactionCommand(), TeardownHistoricSnapshot(), ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

1506 {
1507  ReorderBufferTXN *txn;
1508  volatile Snapshot snapshot_now;
1509  volatile CommandId command_id = FirstCommandId;
1510  bool using_subtxn;
1511  ReorderBufferIterTXNState *volatile iterstate = NULL;
1512 
1513  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1514  false);
1515 
1516  /* unknown transaction, nothing to replay */
1517  if (txn == NULL)
1518  return;
1519 
1520  txn->final_lsn = commit_lsn;
1521  txn->end_lsn = end_lsn;
1522  txn->commit_time = commit_time;
1523  txn->origin_id = origin_id;
1524  txn->origin_lsn = origin_lsn;
1525 
1526  /*
1527  * If this transaction has no snapshot, it didn't make any changes to the
1528  * database, so there's nothing to decode. Note that
1529  * ReorderBufferCommitChild will have transferred any snapshots from
1530  * subtransactions if there were any.
1531  */
1532  if (txn->base_snapshot == NULL)
1533  {
1534  Assert(txn->ninvalidations == 0);
1535  ReorderBufferCleanupTXN(rb, txn);
1536  return;
1537  }
1538 
1539  snapshot_now = txn->base_snapshot;
1540 
1541  /* build data to be able to lookup the CommandIds of catalog tuples */
1543 
1544  /* setup the initial snapshot */
1545  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1546 
1547  /*
1548  * Decoding needs access to syscaches et al., which in turn use
1549  * heavyweight locks and such. Thus we need to have enough state around to
1550  * keep track of those. The easiest way is to simply use a transaction
1551  * internally. That also allows us to easily enforce that nothing writes
1552  * to the database by checking for xid assignments.
1553  *
1554  * When we're called via the SQL SRF there's already a transaction
1555  * started, so start an explicit subtransaction there.
1556  */
1557  using_subtxn = IsTransactionOrTransactionBlock();
1558 
1559  PG_TRY();
1560  {
1561  ReorderBufferChange *change;
1562  ReorderBufferChange *specinsert = NULL;
1563 
1564  if (using_subtxn)
1565  BeginInternalSubTransaction("replay");
1566  else
1568 
1569  rb->begin(rb, txn);
1570 
1571  ReorderBufferIterTXNInit(rb, txn, &iterstate);
1572  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1573  {
1574  Relation relation = NULL;
1575  Oid reloid;
1576 
1577  switch (change->action)
1578  {
1580 
1581  /*
1582  * Confirmation for speculative insertion arrived. Simply
1583  * use as a normal record. It'll be cleaned up at the end
1584  * of INSERT processing.
1585  */
1586  if (specinsert == NULL)
1587  elog(ERROR, "invalid ordering of speculative insertion changes");
1588  Assert(specinsert->data.tp.oldtuple == NULL);
1589  change = specinsert;
1591 
1592  /* intentionally fall through */
1596  Assert(snapshot_now);
1597 
1598  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1599  change->data.tp.relnode.relNode);
1600 
1601  /*
1602  * Mapped catalog tuple without data, emitted while
1603  * catalog table was in the process of being rewritten. We
1604  * can fail to look up the relfilenode, because the
1605  * relmapper has no "historic" view, in contrast to normal
1606  * the normal catalog during decoding. Thus repeated
1607  * rewrites can cause a lookup failure. That's OK because
1608  * we do not decode catalog changes anyway. Normally such
1609  * tuples would be skipped over below, but we can't
1610  * identify whether the table should be logically logged
1611  * without mapping the relfilenode to the oid.
1612  */
1613  if (reloid == InvalidOid &&
1614  change->data.tp.newtuple == NULL &&
1615  change->data.tp.oldtuple == NULL)
1616  goto change_done;
1617  else if (reloid == InvalidOid)
1618  elog(ERROR, "could not map filenode \"%s\" to relation OID",
1619  relpathperm(change->data.tp.relnode,
1620  MAIN_FORKNUM));
1621 
1622  relation = RelationIdGetRelation(reloid);
1623 
1624  if (!RelationIsValid(relation))
1625  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1626  reloid,
1627  relpathperm(change->data.tp.relnode,
1628  MAIN_FORKNUM));
1629 
1630  if (!RelationIsLogicallyLogged(relation))
1631  goto change_done;
1632 
1633  /*
1634  * Ignore temporary heaps created during DDL unless the
1635  * plugin has asked for them.
1636  */
1637  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
1638  goto change_done;
1639 
1640  /*
1641  * For now ignore sequence changes entirely. Most of the
1642  * time they don't log changes using records we
1643  * understand, so it doesn't make sense to handle the few
1644  * cases we do.
1645  */
1646  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1647  goto change_done;
1648 
1649  /* user-triggered change */
1650  if (!IsToastRelation(relation))
1651  {
1652  ReorderBufferToastReplace(rb, txn, relation, change);
1653  rb->apply_change(rb, txn, relation, change);
1654 
1655  /*
1656  * Only clear reassembled toast chunks if we're sure
1657  * they're not required anymore. The creator of the
1658  * tuple tells us.
1659  */
1660  if (change->data.tp.clear_toast_afterwards)
1661  ReorderBufferToastReset(rb, txn);
1662  }
1663  /* we're not interested in toast deletions */
1664  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1665  {
1666  /*
1667  * Need to reassemble the full toasted Datum in
1668  * memory, to ensure the chunks don't get reused till
1669  * we're done remove it from the list of this
1670  * transaction's changes. Otherwise it will get
1671  * freed/reused while restoring spooled data from
1672  * disk.
1673  */
1674  Assert(change->data.tp.newtuple != NULL);
1675 
1676  dlist_delete(&change->node);
1677  ReorderBufferToastAppendChunk(rb, txn, relation,
1678  change);
1679  }
1680 
1681  change_done:
1682 
1683  /*
1684  * Either speculative insertion was confirmed, or it was
1685  * unsuccessful and the record isn't needed anymore.
1686  */
1687  if (specinsert != NULL)
1688  {
1689  ReorderBufferReturnChange(rb, specinsert);
1690  specinsert = NULL;
1691  }
1692 
1693  if (relation != NULL)
1694  {
1695  RelationClose(relation);
1696  relation = NULL;
1697  }
1698  break;
1699 
1701 
1702  /*
1703  * Speculative insertions are dealt with by delaying the
1704  * processing of the insert until the confirmation record
1705  * arrives. For that we simply unlink the record from the
1706  * chain, so it does not get freed/reused while restoring
1707  * spooled data from disk.
1708  *
1709  * This is safe in the face of concurrent catalog changes
1710  * because the relevant relation can't be changed between
1711  * speculative insertion and confirmation due to
1712  * CheckTableNotInUse() and locking.
1713  */
1714 
1715  /* clear out a pending (and thus failed) speculation */
1716  if (specinsert != NULL)
1717  {
1718  ReorderBufferReturnChange(rb, specinsert);
1719  specinsert = NULL;
1720  }
1721 
1722  /* and memorize the pending insertion */
1723  dlist_delete(&change->node);
1724  specinsert = change;
1725  break;
1726 
1728  {
1729  int i;
1730  int nrelids = change->data.truncate.nrelids;
1731  int nrelations = 0;
1732  Relation *relations;
1733 
1734  relations = palloc0(nrelids * sizeof(Relation));
1735  for (i = 0; i < nrelids; i++)
1736  {
1737  Oid relid = change->data.truncate.relids[i];
1738  Relation relation;
1739 
1740  relation = RelationIdGetRelation(relid);
1741 
1742  if (!RelationIsValid(relation))
1743  elog(ERROR, "could not open relation with OID %u", relid);
1744 
1745  if (!RelationIsLogicallyLogged(relation))
1746  continue;
1747 
1748  relations[nrelations++] = relation;
1749  }
1750 
1751  rb->apply_truncate(rb, txn, nrelations, relations, change);
1752 
1753  for (i = 0; i < nrelations; i++)
1754  RelationClose(relations[i]);
1755 
1756  break;
1757  }
1758 
1760  rb->message(rb, txn, change->lsn, true,
1761  change->data.msg.prefix,
1762  change->data.msg.message_size,
1763  change->data.msg.message);
1764  break;
1765 
1767  /* get rid of the old */
1768  TeardownHistoricSnapshot(false);
1769 
1770  if (snapshot_now->copied)
1771  {
1772  ReorderBufferFreeSnap(rb, snapshot_now);
1773  snapshot_now =
1774  ReorderBufferCopySnap(rb, change->data.snapshot,
1775  txn, command_id);
1776  }
1777 
1778  /*
1779  * Restored from disk, need to be careful not to double
1780  * free. We could introduce refcounting for that, but for
1781  * now this seems infrequent enough not to care.
1782  */
1783  else if (change->data.snapshot->copied)
1784  {
1785  snapshot_now =
1786  ReorderBufferCopySnap(rb, change->data.snapshot,
1787  txn, command_id);
1788  }
1789  else
1790  {
1791  snapshot_now = change->data.snapshot;
1792  }
1793 
1794 
1795  /* and continue with the new one */
1796  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1797  break;
1798 
1800  Assert(change->data.command_id != InvalidCommandId);
1801 
1802  if (command_id < change->data.command_id)
1803  {
1804  command_id = change->data.command_id;
1805 
1806  if (!snapshot_now->copied)
1807  {
1808  /* we don't use the global one anymore */
1809  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1810  txn, command_id);
1811  }
1812 
1813  snapshot_now->curcid = command_id;
1814 
1815  TeardownHistoricSnapshot(false);
1816  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1817 
1818  /*
1819  * Every time the CommandId is incremented, we could
1820  * see new catalog contents, so execute all
1821  * invalidations.
1822  */
1824  }
1825 
1826  break;
1827 
1829  elog(ERROR, "tuplecid value in changequeue");
1830  break;
1831  }
1832  }
1833 
1834  /*
1835  * There's a speculative insertion remaining, just clean in up, it
1836  * can't have been successful, otherwise we'd gotten a confirmation
1837  * record.
1838  */
1839  if (specinsert)
1840  {
1841  ReorderBufferReturnChange(rb, specinsert);
1842  specinsert = NULL;
1843  }
1844 
1845  /* clean up the iterator */
1846  ReorderBufferIterTXNFinish(rb, iterstate);
1847  iterstate = NULL;
1848 
1849  /* call commit callback */
1850  rb->commit(rb, txn, commit_lsn);
1851 
1852  /* this is just a sanity check against bad output plugin behaviour */
1854  elog(ERROR, "output plugin used XID %u",
1856 
1857  /* cleanup */
1858  TeardownHistoricSnapshot(false);
1859 
1860  /*
1861  * Aborting the current (sub-)transaction as a whole has the right
1862  * semantics. We want all locks acquired in here to be released, not
1863  * reassigned to the parent and we do not want any database access
1864  * have persistent effects.
1865  */
1867 
1868  /* make sure there's no cache pollution */
1870 
1871  if (using_subtxn)
1873 
1874  if (snapshot_now->copied)
1875  ReorderBufferFreeSnap(rb, snapshot_now);
1876 
1877  /* remove potential on-disk data, and deallocate */
1878  ReorderBufferCleanupTXN(rb, txn);
1879  }
1880  PG_CATCH();
1881  {
1882  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1883  if (iterstate)
1884  ReorderBufferIterTXNFinish(rb, iterstate);
1885 
1887 
1888  /*
1889  * Force cache invalidation to happen outside of a valid transaction
1890  * to prevent catalog access as we just caught an error.
1891  */
1893 
1894  /* make sure there's no cache pollution */
1896 
1897  if (using_subtxn)
1899 
1900  if (snapshot_now->copied)
1901  ReorderBufferFreeSnap(rb, snapshot_now);
1902 
1903  /* remove potential on-disk data, and deallocate */
1904  ReorderBufferCleanupTXN(rb, txn);
1905 
1906  PG_RE_THROW();
1907  }
1908  PG_END_TRY();
1909 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
uint32 CommandId
Definition: c.h:527
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
TimestampTz commit_time
void AbortCurrentTransaction(void)
Definition: xact.c:3183
bool IsToastRelation(Relation relation)
Definition: catalog.c:140
#define relpathperm(rnode, forknum)
Definition: relpath.h:83
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
ReorderBufferApplyChangeCB apply_change
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
struct ReorderBufferChange::@99::@101 truncate
bool copied
Definition: snapshot.h:185
struct ReorderBufferChange::@99::@102 msg
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4674
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2051
ReorderBufferCommitCB commit
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:635
Form_pg_class rd_rel
Definition: rel.h:109
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
XLogRecPtr origin_lsn
#define FirstCommandId
Definition: c.h:529
#define ERROR
Definition: elog.h:43
#define RelationIsValid(relation)
Definition: rel.h:429
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:423
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4485
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:440
#define InvalidTransactionId
Definition: transam.h:31
XLogRecPtr final_lsn
void RelationClose(Relation relation)
Definition: relcache.c:2103
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
ReorderBufferMessageCB message
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
void * palloc0(Size size)
Definition: mcxt.c:980
#define InvalidCommandId
Definition: c.h:530
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
union ReorderBufferChange::@99 data
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
#define InvalidOid
Definition: postgres_ext.h:36
CommandId curcid
Definition: snapshot.h:187
static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define PG_CATCH()
Definition: elog.h:305
struct ReorderBufferChange::@99::@100 tp
#define Assert(condition)
Definition: c.h:738
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
XLogRecPtr end_lsn
void StartTransactionCommand(void)
Definition: xact.c:2818
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4380
#define PG_RE_THROW()
Definition: elog.h:336
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
ReorderBufferApplyTruncateCB apply_truncate
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2035
#define elog(elevel,...)
Definition: elog.h:214
int i
ReorderBufferBeginCB begin
#define PG_TRY()
Definition: elog.h:295
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:1997
#define PG_END_TRY()
Definition: elog.h:320

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 947 of file reorderbuffer.c.

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

950 {
951  ReorderBufferTXN *subtxn;
952 
953  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
954  InvalidXLogRecPtr, false);
955 
956  /*
957  * No need to do anything if that subtxn didn't contain any changes
958  */
959  if (!subtxn)
960  return;
961 
962  subtxn->final_lsn = commit_lsn;
963  subtxn->end_lsn = end_lsn;
964 
965  /*
966  * Assign this subxact as a child of the toplevel xact (no-op if already
967  * done.)
968  */
970 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr final_lsn
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
XLogRecPtr end_lsn
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferCopySnap()

static Snapshot ReorderBufferCopySnap ( ReorderBuffer rb,
Snapshot  orig_snap,
ReorderBufferTXN txn,
CommandId  cid 
)
static

Definition at line 1421 of file reorderbuffer.c.

References SnapshotData::active_count, ReorderBuffer::context, SnapshotData::copied, dlist_iter::cur, SnapshotData::curcid, dlist_container, dlist_foreach, i, MemoryContextAllocZero(), ReorderBufferTXN::nsubtxns, qsort, SnapshotData::regd_count, ReorderBufferTXN::subtxns, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::xcnt, ReorderBufferTXN::xid, xidComparator(), and SnapshotData::xip.

Referenced by ReorderBufferCommit().

1423 {
1424  Snapshot snap;
1425  dlist_iter iter;
1426  int i = 0;
1427  Size size;
1428 
1429  size = sizeof(SnapshotData) +
1430  sizeof(TransactionId) * orig_snap->xcnt +
1431  sizeof(TransactionId) * (txn->nsubtxns + 1);
1432 
1433  snap = MemoryContextAllocZero(rb->context, size);
1434  memcpy(snap, orig_snap, sizeof(SnapshotData));
1435 
1436  snap->copied = true;
1437  snap->active_count = 1; /* mark as active so nobody frees it */
1438  snap->regd_count = 0;
1439  snap->xip = (TransactionId *) (snap + 1);
1440 
1441  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1442 
1443  /*
1444  * snap->subxip contains all txids that belong to our transaction which we
1445  * need to check via cmin/cmax. That's why we store the toplevel
1446  * transaction in there as well.
1447  */
1448  snap->subxip = snap->xip + snap->xcnt;
1449  snap->subxip[i++] = txn->xid;
1450 
1451  /*
1452  * subxcnt isn't decreased when subtransactions abort, so count manually.
1453  * Since it's an upper boundary it is safe to use it for the allocation
1454  * above.
1455  */
1456  snap->subxcnt = 1;
1457 
1458  dlist_foreach(iter, &txn->subtxns)
1459  {
1460  ReorderBufferTXN *sub_txn;
1461 
1462  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1463  snap->subxip[i++] = sub_txn->xid;
1464  snap->subxcnt++;
1465  }
1466 
1467  /* sort so we can bsearch() later */
1468  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1469 
1470  /* store the specified current CommandId */
1471  snap->curcid = cid;
1472 
1473  return snap;
1474 }
uint32 TransactionId
Definition: c.h:513
bool copied
Definition: snapshot.h:185
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
uint32 regd_count
Definition: snapshot.h:199
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
struct SnapshotData SnapshotData
TransactionId * xip
Definition: snapshot.h:168
MemoryContext context
TransactionId xid
dlist_node * cur
Definition: ilist.h:161
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:839
CommandId curcid
Definition: snapshot.h:187
size_t Size
Definition: c.h:466
dlist_head subtxns
uint32 xcnt
Definition: snapshot.h:169
int i
#define qsort(a, b, c, d)
Definition: port.h:479
TransactionId * subxip
Definition: snapshot.h:180
uint32 active_count
Definition: snapshot.h:198
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:139
int32 subxcnt
Definition: snapshot.h:181

◆ ReorderBufferExecuteInvalidations()

static void ReorderBufferExecuteInvalidations ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2237 of file reorderbuffer.c.

References i, ReorderBufferTXN::invalidations, LocalExecuteInvalidationMessage(), and ReorderBufferTXN::ninvalidations.

Referenced by ReorderBufferCommit().

2238 {
2239  int i;
2240 
2241  for (i = 0; i < txn->ninvalidations; i++)
2243 }
SharedInvalidationMessage * invalidations
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:556
int i

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1993 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

1994 {
1995  ReorderBufferTXN *txn;
1996 
1997  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1998  false);
1999 
2000  /* unknown, nothing to forget */
2001  if (txn == NULL)
2002  return;
2003 
2004  /* cosmetic... */
2005  txn->final_lsn = lsn;
2006 
2007  /*
2008  * Process cache invalidation messages if there are any. Even if we're not
2009  * interested in the transaction's contents, it could have manipulated the
2010  * catalog and we need to update the caches according to that.
2011  */
2012  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2014  txn->invalidations);
2015  else
2016  Assert(txn->ninvalidations == 0);
2017 
2018  /* remove potential on-disk data, and deallocate */
2019  ReorderBufferCleanupTXN(rb, txn);
2020 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:738
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer rb)

Definition at line 343 of file reorderbuffer.c.

References ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

344 {
345  MemoryContext context = rb->context;
346 
347  /*
348  * We free separately allocated data by entirely scrapping reorderbuffer's
349  * memory context.
350  */
351  MemoryContextDelete(context);
352 
353  /* Free disk space used by unconsumed reorder buffers */
355 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
ReplicationSlotPersistentData data
Definition: slot.h:143
MemoryContext context
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NameStr(name)
Definition: c.h:615
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)

◆ ReorderBufferFreeSnap()

static void ReorderBufferFreeSnap ( ReorderBuffer rb,
Snapshot  snap 
)
static

Definition at line 1480 of file reorderbuffer.c.

References SnapshotData::copied, pfree(), and SnapBuildSnapDecRefcount().

Referenced by ReorderBufferCommit(), and ReorderBufferReturnChange().

1481 {
1482  if (snap->copied)
1483  pfree(snap);
1484  else
1486 }
bool copied
Definition: snapshot.h:185
void pfree(void *pointer)
Definition: mcxt.c:1056
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:428

◆ ReorderBufferGetChange()

ReorderBufferChange* ReorderBufferGetChange ( ReorderBuffer rb)

Definition at line 411 of file reorderbuffer.c.

References ReorderBuffer::change_context, and MemoryContextAlloc().

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddNewTupleCids(), ReorderBufferAddSnapshot(), ReorderBufferQueueMessage(), and ReorderBufferRestoreChange().

412 {
413  ReorderBufferChange *change;
414 
415  change = (ReorderBufferChange *)
417 
418  memset(change, 0, sizeof(ReorderBufferChange));
419  return change;
420 }
MemoryContext change_context
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer rb)

Definition at line 775 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessRunningXacts().

776 {
777  ReorderBufferTXN *txn;
778 
779  AssertTXNLsnOrder(rb);
780 
782  return NULL;
783 
785 
788  return txn;
789 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
dlist_head toplevel_by_lsn
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:738
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define rbtxn_is_known_subxact(txn)

◆ ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBuffer rb)

Definition at line 803 of file reorderbuffer.c.

References AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, dlist_head_element, dlist_is_empty(), InvalidTransactionId, ReorderBufferTXNByIdEnt::txn, ReorderBuffer::txns_by_base_snapshot_lsn, and SnapshotData::xmin.

Referenced by SnapBuildProcessRunningXacts().

804 {
805  ReorderBufferTXN *txn;
806 
807  AssertTXNLsnOrder(rb);
808 
810  return InvalidTransactionId;
811 
812  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
814  return txn->base_snapshot->xmin;
815 }
Snapshot base_snapshot
dlist_head txns_by_base_snapshot_lsn
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: snapshot.h:157
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289

◆ ReorderBufferGetRelids()

Oid* ReorderBufferGetRelids ( ReorderBuffer rb,
int  nrelids 
)

Definition at line 523 of file reorderbuffer.c.

References ReorderBuffer::context, and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

524 {
525  Oid *relids;
526  Size alloc_len;
527 
528  alloc_len = sizeof(Oid) * nrelids;
529 
530  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
531 
532  return relids;
533 }
unsigned int Oid
Definition: postgres_ext.h:31
MemoryContext context
size_t Size
Definition: c.h:466
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796

◆ ReorderBufferGetTupleBuf()

ReorderBufferTupleBuf* ReorderBufferGetTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)

Definition at line 487 of file reorderbuffer.c.

References ReorderBufferTupleBuf::alloc_tuple_size, MemoryContextAlloc(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, HeapTupleData::t_data, ReorderBuffer::tup_context, and ReorderBufferTupleBuf::tuple.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

488 {
489  ReorderBufferTupleBuf *tuple;
490  Size alloc_len;
491 
492  alloc_len = tuple_len + SizeofHeapTupleHeader;
493 
494  tuple = (ReorderBufferTupleBuf *)
496  sizeof(ReorderBufferTupleBuf) +
497  MAXIMUM_ALIGNOF + alloc_len);
498  tuple->alloc_tuple_size = alloc_len;
499  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
500 
501  return tuple;
502 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
HeapTupleHeader t_data
Definition: htup.h:68
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:38
HeapTupleData tuple
Definition: reorderbuffer.h:29
size_t Size
Definition: c.h:466
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
MemoryContext tup_context

◆ ReorderBufferGetTXN()

static ReorderBufferTXN * ReorderBufferGetTXN ( ReorderBuffer rb)
static

Definition at line 361 of file reorderbuffer.c.

References ReorderBufferTXN::changes, dlist_init(), MemoryContextAlloc(), ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferTXNByIdEnt::txn, and ReorderBuffer::txn_context.

Referenced by ReorderBufferTXNByXid().

362 {
363  ReorderBufferTXN *txn;
364 
365  txn = (ReorderBufferTXN *)
367 
368  memset(txn, 0, sizeof(ReorderBufferTXN));
369 
370  dlist_init(&txn->changes);
371  dlist_init(&txn->tuplecids);
372  dlist_init(&txn->subtxns);
373 
374  return txn;
375 }
dlist_head changes
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
dlist_head subtxns
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
MemoryContext txn_context
dlist_head tuplecids

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 2029 of file reorderbuffer.c.

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by DecodeStandbyOp(), and ReorderBufferForget().

2031 {
2032  bool use_subtxn = IsTransactionOrTransactionBlock();
2033  int i;
2034 
2035  if (use_subtxn)
2036  BeginInternalSubTransaction("replay");
2037 
2038  /*
2039  * Force invalidations to happen outside of a valid transaction - that way
2040  * entries will just be marked as invalid without accessing the catalog.
2041  * That's advantageous because we don't need to setup the full state
2042  * necessary for catalog access.
2043  */
2044  if (use_subtxn)
2046 
2047  for (i = 0; i < ninvalidations; i++)
2048  LocalExecuteInvalidationMessage(&invalidations[i]);
2049 
2050  if (use_subtxn)
2052 }
void AbortCurrentTransaction(void)
Definition: xact.c:3183
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4674
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4485
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4380
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:556
int i

◆ ReorderBufferIterCompare()

static int ReorderBufferIterCompare ( Datum  a,
Datum  b,
void *  arg 
)
static

Definition at line 989 of file reorderbuffer.c.

References DatumGetInt32, ReorderBufferIterTXNState::entries, and ReorderBufferIterTXNEntry::lsn.

Referenced by ReorderBufferIterTXNInit().

990 {
992  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
993  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
994 
995  if (pos_a < pos_b)
996  return 1;
997  else if (pos_a == pos_b)
998  return 0;
999  return -1;
1000 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
#define DatumGetInt32(X)
Definition: postgres.h:472
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:298
void * arg

◆ ReorderBufferIterTXNFinish()

static void ReorderBufferIterTXNFinish ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1220 of file reorderbuffer.c.

References Assert, binaryheap_free(), dlist_container, dlist_is_empty(), dlist_pop_head_node(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::file, FileClose(), ReorderBufferIterTXNState::heap, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, pfree(), ReorderBufferReturnChange(), and TXNEntryFile::vfd.

Referenced by ReorderBufferCommit().

1222 {
1223  int32 off;
1224 
1225  for (off = 0; off < state->nr_txns; off++)
1226  {
1227  if (state->entries[off].file.vfd != -1)
1228  FileClose(state->entries[off].file.vfd);
1229  }
1230 
1231  /* free memory we might have "leaked" in the last *Next call */
1232  if (!dlist_is_empty(&state->old_change))
1233  {
1234  ReorderBufferChange *change;
1235 
1236  change = dlist_container(ReorderBufferChange, node,
1237  dlist_pop_head_node(&state->old_change));
1238  ReorderBufferReturnChange(rb, change);
1239  Assert(dlist_is_empty(&state->old_change));
1240  }
1241 
1242  binaryheap_free(state->heap);
1243  pfree(state);
1244 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
signed int int32
Definition: c.h:355
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:1056
void FileClose(File file)
Definition: fd.c:1826
#define Assert(condition)
Definition: c.h:738
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:69
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368

◆ ReorderBufferIterTXNInit()

static void ReorderBufferIterTXNInit ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferIterTXNState *volatile *  iter_state 
)
static

Definition at line 1012 of file reorderbuffer.c.

References binaryheap_add_unordered(), binaryheap_allocate(), binaryheap_build(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, ReorderBuffer::context, dlist_iter::cur, dlist_container, dlist_foreach, dlist_head_element, dlist_init(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::file, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, MemoryContextAllocZero(), ReorderBufferTXN::nentries, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, rbtxn_is_serialized, ReorderBufferIterCompare(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferIterTXNEntry::segno, ReorderBufferTXN::subtxns, ReorderBufferTXNByIdEnt::txn, ReorderBufferIterTXNEntry::txn, and TXNEntryFile::vfd.

Referenced by ReorderBufferCommit().

1014 {
1015  Size nr_txns = 0;
1017  dlist_iter cur_txn_i;
1018  int32 off;
1019 
1020  *iter_state = NULL;
1021 
1022  /*
1023  * Calculate the size of our heap: one element for every transaction that
1024  * contains changes. (Besides the transactions already in the reorder
1025  * buffer, we count the one we were directly passed.)
1026  */
1027  if (txn->nentries > 0)
1028  nr_txns++;
1029 
1030  dlist_foreach(cur_txn_i, &txn->subtxns)
1031  {
1032  ReorderBufferTXN *cur_txn;
1033 
1034  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1035 
1036  if (cur_txn->nentries > 0)
1037  nr_txns++;
1038  }
1039 
1040  /* allocate iteration state */
1041  state = (ReorderBufferIterTXNState *)
1043  sizeof(ReorderBufferIterTXNState) +
1044  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1045 
1046  state->nr_txns = nr_txns;
1047  dlist_init(&state->old_change);
1048 
1049  for (off = 0; off < state->nr_txns; off++)
1050  {
1051  state->entries[off].file.vfd = -1;
1052  state->entries[off].segno = 0;
1053  }
1054 
1055  /* allocate heap */
1056  state->heap = binaryheap_allocate(state->nr_txns,
1058  state);
1059 
1060  /* Now that the state fields are initialized, it is safe to return it. */
1061  *iter_state = state;
1062 
1063  /*
1064  * Now insert items into the binary heap, in an unordered fashion. (We
1065  * will run a heap assembly step at the end; this is more efficient.)
1066  */
1067 
1068  off = 0;
1069 
1070  /* add toplevel transaction if it contains changes */
1071  if (txn->nentries > 0)
1072  {
1073  ReorderBufferChange *cur_change;
1074 
1075  if (rbtxn_is_serialized(txn))
1076  {
1077  /* serialize remaining changes */
1078  ReorderBufferSerializeTXN(rb, txn);
1079  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1080  &state->entries[off].segno);
1081  }
1082 
1083  cur_change = dlist_head_element(ReorderBufferChange, node,
1084  &txn->changes);
1085 
1086  state->entries[off].lsn = cur_change->lsn;
1087  state->entries[off].change = cur_change;
1088  state->entries[off].txn = txn;
1089 
1091  }
1092 
1093  /* add subtransactions if they contain changes */
1094  dlist_foreach(cur_txn_i, &txn->subtxns)
1095  {
1096  ReorderBufferTXN *cur_txn;
1097 
1098  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1099 
1100  if (cur_txn->nentries > 0)
1101  {
1102  ReorderBufferChange *cur_change;
1103 
1104  if (rbtxn_is_serialized(cur_txn))
1105  {
1106  /* serialize remaining changes */
1107  ReorderBufferSerializeTXN(rb, cur_txn);
1108  ReorderBufferRestoreChanges(rb, cur_txn,
1109  &state->entries[off].file,
1110  &state->entries[off].segno);
1111  }
1112  cur_change = dlist_head_element(ReorderBufferChange, node,
1113  &cur_txn->changes);
1114 
1115  state->entries[off].lsn = cur_change->lsn;
1116  state->entries[off].change = cur_change;
1117  state->entries[off].txn = cur_txn;
1118 
1120  }
1121  }
1122 
1123  /* assemble a valid binary heap */
1124  binaryheap_build(state->heap);
1125 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
ReorderBufferTXN * txn
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
void binaryheap_add_unordered(binaryheap *heap, Datum d)
Definition: binaryheap.c:110
signed int int32
Definition: c.h:355
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
ReorderBufferChange * change
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
MemoryContext context
dlist_node * cur
Definition: ilist.h:161
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:839
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:126
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
Definition: regguts.h:298
size_t Size
Definition: c.h:466
dlist_head subtxns
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:33
#define Int32GetDatum(X)
Definition: postgres.h:479
#define rbtxn_is_serialized(txn)

◆ ReorderBufferIterTXNNext()

static ReorderBufferChange * ReorderBufferIterTXNNext ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1134 of file reorderbuffer.c.

References Assert, binaryheap::bh_size, binaryheap_first(), binaryheap_remove_first(), binaryheap_replace_first(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, DatumGetInt32, DEBUG2, dlist_container, dlist_delete(), dlist_has_next(), dlist_head_element, dlist_is_empty(), dlist_next_node(), dlist_pop_head_node(), dlist_push_tail(), elog, ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::file, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, next, ReorderBufferChange::node, ReorderBufferIterTXNState::old_change, ReorderBufferRestoreChanges(), ReorderBufferReturnChange(), ReorderBufferIterTXNEntry::segno, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferCommit().

1135 {
1136  ReorderBufferChange *change;
1138  int32 off;
1139 
1140  /* nothing there anymore */
1141  if (state->heap->bh_size == 0)
1142  return NULL;
1143 
1144  off = DatumGetInt32(binaryheap_first(state->heap));
1145  entry = &state->entries[off];
1146 
1147  /* free memory we might have "leaked" in the previous *Next call */
1148  if (!dlist_is_empty(&state->old_change))
1149  {
1150  change = dlist_container(ReorderBufferChange, node,
1151  dlist_pop_head_node(&state->old_change));
1152  ReorderBufferReturnChange(rb, change);
1153  Assert(dlist_is_empty(&state->old_change));
1154  }
1155 
1156  change = entry->change;
1157 
1158  /*
1159  * update heap with information about which transaction has the next
1160  * relevant change in LSN order
1161  */
1162 
1163  /* there are in-memory changes */
1164  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1165  {
1166  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1167  ReorderBufferChange *next_change =
1168  dlist_container(ReorderBufferChange, node, next);
1169 
1170  /* txn stays the same */
1171  state->entries[off].lsn = next_change->lsn;
1172  state->entries[off].change = next_change;
1173 
1175  return change;
1176  }
1177 
1178  /* try to load changes from disk */
1179  if (entry->txn->nentries != entry->txn->nentries_mem)
1180  {
1181  /*
1182  * Ugly: restoring changes will reuse *Change records, thus delete the
1183  * current one from the per-tx list and only free in the next call.
1184  */
1185  dlist_delete(&change->node);
1186  dlist_push_tail(&state->old_change, &change->node);
1187 
1188  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1189  &state->entries[off].segno))
1190  {
1191  /* successfully restored changes from disk */
1192  ReorderBufferChange *next_change =
1194  &entry->txn->changes);
1195 
1196  elog(DEBUG2, "restored %u/%u changes from disk",
1197  (uint32) entry->txn->nentries_mem,
1198  (uint32) entry->txn->nentries);
1199 
1200  Assert(entry->txn->nentries_mem);
1201  /* txn stays the same */
1202  state->entries[off].lsn = next_change->lsn;
1203  state->entries[off].change = next_change;
1205 
1206  return change;
1207  }
1208  }
1209 
1210  /* ok, no changes there anymore, remove */
1211  binaryheap_remove_first(state->heap);
1212 
1213  return change;
1214 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
static int32 next
Definition: blutils.c:218
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
#define DatumGetInt32(X)
Definition: postgres.h:472
ReorderBufferTXN * txn
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void binaryheap_replace_first(binaryheap *heap, Datum d)
Definition: binaryheap.c:204
signed int int32
Definition: c.h:355
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:421
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
Datum binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:159
#define DEBUG2
Definition: elog.h:24
unsigned int uint32
Definition: c.h:367
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
int bh_size
Definition: binaryheap.h:32
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
ReorderBufferChange * change
static bool dlist_has_next(dlist_head *head, dlist_node *node)
Definition: ilist.h:402
#define Assert(condition)
Definition: c.h:738
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define Int32GetDatum(X)
Definition: postgres.h:479
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
#define elog(elevel,...)
Definition: elog.h:214
Datum binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:174

◆ ReorderBufferLargestTXN()

static ReorderBufferTXN* ReorderBufferLargestTXN ( ReorderBuffer rb)
static

Definition at line 2337 of file reorderbuffer.c.

References Assert, ReorderBuffer::by_txn, hash_seq_init(), hash_seq_search(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferCheckMemoryLimit().

2338 {
2339  HASH_SEQ_STATUS hash_seq;
2341  ReorderBufferTXN *largest = NULL;
2342 
2343  hash_seq_init(&hash_seq, rb->by_txn);
2344  while ((ent = hash_seq_search(&hash_seq)) != NULL)
2345  {
2346  ReorderBufferTXN *txn = ent->txn;
2347 
2348  /* if the current transaction is larger, remember it */
2349  if ((!largest) || (txn->size > largest->size))
2350  largest = txn;
2351  }
2352 
2353  Assert(largest);
2354  Assert(largest->size > 0);
2355  Assert(largest->size <= rb->size);
2356 
2357  return largest;
2358 }
#define Assert(condition)
Definition: c.h:738
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1391
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1381
ReorderBufferTXN * txn

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2065 of file reorderbuffer.c.

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by DecodeHeap2Op(), DecodeHeapOp(), DecodeLogicalMsgOp(), DecodeStandbyOp(), DecodeXactOp(), DecodeXLogOp(), and LogicalDecodingProcessRecord().

2066 {
2067  /* many records won't have an xid assigned, centralize check here */
2068  if (xid != InvalidTransactionId)
2069  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2070 }
#define InvalidTransactionId
Definition: transam.h:31
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change 
)

Definition at line 634 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBufferChangeMemoryUpdate(), ReorderBufferCheckMemoryLimit(), ReorderBufferTXNByXid(), ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

636 {
637  ReorderBufferTXN *txn;
638 
639  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
640 
641  change->lsn = lsn;
642  change->txn = txn;
643 
644  Assert(InvalidXLogRecPtr != lsn);
645  dlist_push_tail(&txn->changes, &change->node);
646  txn->nentries++;
647  txn->nentries_mem++;
648 
649  /* update memory accounting information */
650  ReorderBufferChangeMemoryUpdate(rb, change, true);
651 
652  /* check the memory limits and evict something if needed */
654 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:86
dlist_head changes
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:738
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snapshot,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 660 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), TeardownHistoricSnapshot(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeLogicalMsgOp().

664 {
665  if (transactional)
666  {
667  MemoryContext oldcontext;
668  ReorderBufferChange *change;
669 
671 
672  oldcontext = MemoryContextSwitchTo(rb->context);
673 
674  change = ReorderBufferGetChange(rb);
676  change->data.msg.prefix = pstrdup(prefix);
677  change->data.msg.message_size = message_size;
678  change->data.msg.message = palloc(message_size);
679  memcpy(change->data.msg.message, message, message_size);
680 
681  ReorderBufferQueueChange(rb, xid, lsn, change);
682 
683  MemoryContextSwitchTo(oldcontext);
684  }
685  else
686  {
687  ReorderBufferTXN *txn = NULL;
688  volatile Snapshot snapshot_now = snapshot;
689 
690  if (xid != InvalidTransactionId)
691  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
692 
693  /* setup snapshot to allow catalog access */
694  SetupHistoricSnapshot(snapshot_now, NULL);
695  PG_TRY();
696  {
697  rb->message(rb, txn, lsn, false, prefix, message_size, message);
698 
700  }
701  PG_CATCH();
702  {
704  PG_RE_THROW();
705  }
706  PG_END_TRY();
707  }
708 }
struct ReorderBufferChange::@99::@102 msg
char * pstrdup(const char *in)
Definition: mcxt.c:1186
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2051
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
#define InvalidTransactionId
Definition: transam.h:31
ReorderBufferMessageCB message
MemoryContext context
union ReorderBufferChange::@99 data
#define PG_CATCH()
Definition: elog.h:305
#define Assert(condition)
Definition: c.h:738
#define PG_RE_THROW()
Definition: elog.h:336
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:949
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2035
#define PG_TRY()
Definition: elog.h:295
#define PG_END_TRY()
Definition: elog.h:320

◆ ReorderBufferRestoreChange()

static void ReorderBufferRestoreChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
char *  change 
)
static

Definition at line 2917 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, ReorderBufferTXN::changes, ReorderBuffer::context, ReorderBufferChange::data, dlist_push_tail(), MemoryContextAlloc(), MemoryContextAllocZero(), ReorderBufferChange::msg, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, offsetof, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferGetChange(), ReorderBufferGetRelids(), ReorderBufferGetTupleBuf(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, ReorderBufferChange::snapshot, SnapshotData::subxcnt, ReorderBufferChange::tp, ReorderBufferChange::truncate, SnapshotData::xcnt, and SnapshotData::xip.

Referenced by ReorderBufferRestoreChanges().

2919 {
2920  ReorderBufferDiskChange *ondisk;
2921  ReorderBufferChange *change;
2922 
2923  ondisk = (ReorderBufferDiskChange *) data;
2924 
2925  change = ReorderBufferGetChange(rb);
2926 
2927  /* copy static part */
2928  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
2929 
2930  data += sizeof(ReorderBufferDiskChange);
2931 
2932  /* restore individual stuff */
2933  switch (change->action)
2934  {
2935  /* fall through these, they're all similar enough */
2940  if (change->data.tp.oldtuple)
2941  {
2942  uint32 tuplelen = ((HeapTuple) data)->t_len;
2943 
2944  change->data.tp.oldtuple =
2946 
2947  /* restore ->tuple */
2948  memcpy(&change->data.tp.oldtuple->tuple, data,
2949  sizeof(HeapTupleData));
2950  data += sizeof(HeapTupleData);
2951 
2952  /* reset t_data pointer into the new tuplebuf */
2953  change->data.tp.oldtuple->tuple.t_data =
2954  ReorderBufferTupleBufData(change->data.tp.oldtuple);
2955 
2956  /* restore tuple data itself */
2957  memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
2958  data += tuplelen;
2959  }
2960 
2961  if (change->data.tp.newtuple)
2962  {
2963  /* here, data might not be suitably aligned! */
2964  uint32 tuplelen;
2965 
2966  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
2967  sizeof(uint32));
2968 
2969  change->data.tp.newtuple =
2971 
2972  /* restore ->tuple */
2973  memcpy(&change->data.tp.newtuple->tuple, data,
2974  sizeof(HeapTupleData));
2975  data += sizeof(HeapTupleData);
2976 
2977  /* reset t_data pointer into the new tuplebuf */
2978  change->data.tp.newtuple->tuple.t_data =
2979  ReorderBufferTupleBufData(change->data.tp.newtuple);
2980 
2981  /* restore tuple data itself */
2982  memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
2983  data += tuplelen;
2984  }
2985 
2986  break;
2988  {
2989  Size prefix_size;
2990 
2991  /* read prefix */
2992  memcpy(&prefix_size, data, sizeof(Size));
2993  data += sizeof(Size);
2994  change->data.msg.prefix = MemoryContextAlloc(rb->context,
2995  prefix_size);
2996  memcpy(change->data.msg.prefix, data, prefix_size);
2997  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
2998  data += prefix_size;
2999 
3000  /* read the message */
3001  memcpy(&change->data.msg.message_size, data, sizeof(Size));
3002  data += sizeof(Size);
3003  change->data.msg.message = MemoryContextAlloc(rb->context,
3004  change->data.msg.message_size);
3005  memcpy(change->data.msg.message, data,
3006  change->data.msg.message_size);
3007  data += change->data.msg.message_size;
3008 
3009  break;
3010  }
3012  {
3013  Snapshot oldsnap;
3014  Snapshot newsnap;
3015  Size size;
3016 
3017  oldsnap = (Snapshot) data;
3018 
3019  size = sizeof(SnapshotData) +
3020  sizeof(TransactionId) * oldsnap->xcnt +
3021  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
3022 
3023  change->data.snapshot = MemoryContextAllocZero(rb->context, size);
3024 
3025  newsnap = change->data.snapshot;
3026 
3027  memcpy(newsnap, data, size);
3028  newsnap->xip = (TransactionId *)
3029  (((char *) newsnap) + sizeof(SnapshotData));
3030  newsnap->subxip = newsnap->xip + newsnap->xcnt;
3031  newsnap->copied = true;
3032  break;
3033  }
3034  /* the base struct contains all the data, easy peasy */
3036  {
3037  Oid *relids;
3038 
3039  relids = ReorderBufferGetRelids(rb,
3040  change->data.truncate.nrelids);
3041  memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
3042  change->data.truncate.relids = relids;
3043 
3044  break;
3045  }
3049  break;
3050  }
3051 
3052  dlist_push_tail(&txn->changes, &change->node);
3053  txn->nentries_mem++;
3054 
3055  /*
3056  * Update memory accounting for the restored change. We need to do this
3057  * although we don't check the memory limit when restoring the changes in
3058  * this branch (we only do that when initially queueing the changes after
3059  * decoding), because we will release the changes later, and that will
3060  * update the accounting too (subtracting the size from the counters). And
3061  * we don't want to underflow there.
3062  */
3063  ReorderBufferChangeMemoryUpdate(rb, change, true);
3064 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
HeapTupleData * HeapTuple
Definition: htup.h:71
struct ReorderBufferChange::@99::@101 truncate
uint32 TransactionId
Definition: c.h:513
struct ReorderBufferChange::@99::@102 msg
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
struct SnapshotData * Snapshot
Definition: snapshot.h:121
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:38
dlist_head changes
struct SnapshotData SnapshotData
unsigned int uint32
Definition: c.h:367
TransactionId * xip
Definition: snapshot.h:168
MemoryContext context
union ReorderBufferChange::@99 data
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:839
struct ReorderBufferDiskChange ReorderBufferDiskChange
struct ReorderBufferChange::@99::@100 tp
#define Assert(condition)
Definition: c.h:738
size_t Size
Definition: c.h:466
uint32 xcnt
Definition: snapshot.h:169
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
Oid * ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
ReorderBufferChange change
struct HeapTupleData HeapTupleData
#define offsetof(type, field)
Definition: c.h:661
int32 subxcnt
Definition: snapshot.h:181
ReorderBufferTupleBuf * ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)

◆ ReorderBufferRestoreChanges()

static Size ReorderBufferRestoreChanges ( ReorderBuffer rb,
ReorderBufferTXN txn,
TXNEntryFile file,
XLogSegNo segno 
)
static

Definition at line 2776 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, cleanup(), dlist_mutable_iter::cur, TXNEntryFile::curOffset, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FileClose(), FileRead(), ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, max_changes_in_memory, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBuffer::outbuf, PathNameOpenFile(), PG_BINARY, ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferSerializedPath(), ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, TXNEntryFile::vfd, WAIT_EVENT_REORDER_BUFFER_READ, wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferIterTXNInit(), and ReorderBufferIterTXNNext().

2778 {
2779  Size restored = 0;
2780  XLogSegNo last_segno;
2781  dlist_mutable_iter cleanup_iter;
2782  File *fd = &file->vfd;
2783 
2786 
2787  /* free current entries, so we have memory for more */
2788  dlist_foreach_modify(cleanup_iter, &txn->changes)
2789  {
2791  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
2792 
2793  dlist_delete(&cleanup->node);
2794  ReorderBufferReturnChange(rb, cleanup);
2795  }
2796  txn->nentries_mem = 0;
2797  Assert(dlist_is_empty(&txn->changes));
2798 
2799  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
2800 
2801  while (restored < max_changes_in_memory && *segno <= last_segno)
2802  {
2803  int readBytes;
2804  ReorderBufferDiskChange *ondisk;
2805 
2806  if (*fd == -1)
2807  {
2808  char path[MAXPGPATH];
2809 
2810  /* first time in */
2811  if (*segno == 0)
2812  XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
2813 
2814  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2815 
2816  /*
2817  * No need to care about TLIs here, only used during a single run,
2818  * so each LSN only maps to a specific WAL record.
2819  */
2821  *segno);
2822 
2823  *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
2824 
2825  /* No harm in resetting the offset even in case of failure */
2826  file->curOffset = 0;
2827 
2828  if (*fd < 0 && errno == ENOENT)
2829  {
2830  *fd = -1;
2831  (*segno)++;
2832  continue;
2833  }
2834  else if (*fd < 0)
2835  ereport(ERROR,
2837  errmsg("could not open file \"%s\": %m",
2838  path)));
2839  }
2840 
2841  /*
2842  * Read the statically sized part of a change which has information
2843  * about the total size. If we couldn't read a record, we're at the
2844  * end of this file.
2845  */
2847  readBytes = FileRead(file->vfd, rb->outbuf,
2848  sizeof(ReorderBufferDiskChange),
2850 
2851  /* eof */
2852  if (readBytes == 0)
2853  {
2854  FileClose(*fd);
2855  *fd = -1;
2856  (*segno)++;
2857  continue;
2858  }
2859  else if (readBytes < 0)
2860  ereport(ERROR,
2862  errmsg("could not read from reorderbuffer spill file: %m")));
2863  else if (readBytes != sizeof(ReorderBufferDiskChange))
2864  ereport(ERROR,
2866  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2867  readBytes,
2868  (uint32) sizeof(ReorderBufferDiskChange))));
2869 
2870  file->curOffset += readBytes;
2871 
2872  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2873 
2875  sizeof(ReorderBufferDiskChange) + ondisk->size);
2876  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2877 
2878  readBytes = FileRead(file->vfd,
2879  rb->outbuf + sizeof(ReorderBufferDiskChange),
2880  ondisk->size - sizeof(ReorderBufferDiskChange),
2881  file->curOffset,
2883 
2884  if (readBytes < 0)
2885  ereport(ERROR,
2887  errmsg("could not read from reorderbuffer spill file: %m")));
2888  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
2889  ereport(ERROR,
2891  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2892  readBytes,
2893  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
2894 
2895  file->curOffset += readBytes;
2896 
2897  /*
2898  * ok, read a full change from disk, now restore it into proper
2899  * in-memory format
2900  */
2901  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
2902  restored++;
2903  }
2904 
2905  return restored;
2906 }
XLogRecPtr first_lsn
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
dlist_node * cur
Definition: ilist.h:180
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1436
int wal_segment_size
Definition: xlog.c:116
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1233
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define ERROR
Definition: elog.h:43
dlist_head changes
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:41
int errcode_for_file_access(void)
Definition: elog.c:633
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
unsigned int uint32
Definition: c.h:367
XLogRecPtr final_lsn
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
static void cleanup(void)
Definition: bootstrap.c:893
TransactionId xid
#define ereport(elevel,...)
Definition: elog.h:144
struct ReorderBufferDiskChange ReorderBufferDiskChange
void FileClose(File file)
Definition: fd.c:1826
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:738
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
size_t Size
Definition: c.h:466
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
int errmsg(const char *fmt,...)
Definition: elog.c:824
static const Size max_changes_in_memory
int FileRead(File file, char *buffer, int amount, off_t offset, uint32 wait_event_info)
Definition: fd.c:1973
int File
Definition: fd.h:49
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ ReorderBufferRestoreCleanup()

static void ReorderBufferRestoreCleanup ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 3070 of file reorderbuffer.c.

References Assert, cur, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, MAXPGPATH, MyReplicationSlot, ReorderBufferSerializedPath(), wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferCleanupTXN().

3071 {
3072  XLogSegNo first;
3073  XLogSegNo cur;
3074  XLogSegNo last;
3075 
3078 
3079  XLByteToSeg(txn->first_lsn, first, wal_segment_size);
3080  XLByteToSeg(txn->final_lsn, last, wal_segment_size);
3081 
3082  /* iterate over all possible filenames, and delete them */
3083  for (cur = first; cur <= last; cur++)
3084  {
3085  char path[MAXPGPATH];
3086 
3088  if (unlink(path) != 0 && errno != ENOENT)
3089  ereport(ERROR,
3091  errmsg("could not remove file \"%s\": %m", path)));
3092  }
3093 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int wal_segment_size
Definition: xlog.c:116
struct cursor * cur
Definition: ecpg.c:28
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:41
int errcode_for_file_access(void)
Definition: elog.c:633
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
XLogRecPtr final_lsn
TransactionId xid
#define ereport(elevel,...)
Definition: elog.h:144
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:738
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ ReorderBufferReturnChange()

void ReorderBufferReturnChange ( ReorderBuffer rb,
ReorderBufferChange change 
)

Definition at line 426 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::msg, pfree(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferFreeSnap(), ReorderBufferReturnRelids(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, ReorderBufferChange::tp, and ReorderBufferChange::truncate.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferCommit(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), and ReorderBufferToastReset().

427 {
428  /* update memory accounting info */
429  ReorderBufferChangeMemoryUpdate(rb, change, false);
430 
431  /* free contained data */
432  switch (change->action)
433  {
438  if (change->data.tp.newtuple)
439  {
440  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
441  change->data.tp.newtuple = NULL;
442  }
443 
444  if (change->data.tp.oldtuple)
445  {
446  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
447  change->data.tp.oldtuple = NULL;
448  }
449  break;
451  if (change->data.msg.prefix != NULL)
452  pfree(change->data.msg.prefix);
453  change->data.msg.prefix = NULL;
454  if (change->data.msg.message != NULL)
455  pfree(change->data.msg.message);
456  change->data.msg.message = NULL;
457  break;
459  if (change->data.snapshot)
460  {
461  ReorderBufferFreeSnap(rb, change->data.snapshot);
462  change->data.snapshot = NULL;
463  }
464  break;
465  /* no data in addition to the struct itself */
467  if (change->data.truncate.relids != NULL)
468  {
469  ReorderBufferReturnRelids(rb, change->data.truncate.relids);
470  change->data.truncate.relids = NULL;
471  }
472  break;
476  break;
477  }
478 
479  pfree(change);
480 }
struct ReorderBufferChange::@99::@101 truncate
struct ReorderBufferChange::@99::@102 msg
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
void pfree(void *pointer)
Definition: mcxt.c:1056
void ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)
union ReorderBufferChange::@99 data
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
struct ReorderBufferChange::@99::@100 tp
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)

◆ ReorderBufferReturnRelids()

void ReorderBufferReturnRelids ( ReorderBuffer rb,
Oid relids 
)

Definition at line 539 of file reorderbuffer.c.

References pfree().

Referenced by ReorderBufferReturnChange().

540 {
541  pfree(relids);
542 }
void pfree(void *pointer)
Definition: mcxt.c:1056

◆ ReorderBufferReturnTupleBuf()

void ReorderBufferReturnTupleBuf ( ReorderBuffer rb,
ReorderBufferTupleBuf tuple 
)

Definition at line 508 of file reorderbuffer.c.

References pfree().

Referenced by ReorderBufferReturnChange().

509 {
510  pfree(tuple);
511 }
void pfree(void *pointer)
Definition: mcxt.c:1056

◆ ReorderBufferReturnTXN()

static void ReorderBufferReturnTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 381 of file reorderbuffer.c.

References ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, hash_destroy(), ReorderBufferTXN::invalidations, InvalidTransactionId, pfree(), ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::xid.

Referenced by ReorderBufferCleanupTXN().

382 {
383  /* clean the lookup cache if we were cached (quite likely) */
384  if (rb->by_txn_last_xid == txn->xid)
385  {
387  rb->by_txn_last_txn = NULL;
388  }
389 
390  /* free data that's contained */
391 
392  if (txn->tuplecid_hash != NULL)
393  {
395  txn->tuplecid_hash = NULL;
396  }
397 
398  if (txn->invalidations)
399  {
400  pfree(txn->invalidations);
401  txn->invalidations = NULL;
402  }
403 
404  pfree(txn);
405 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:816
TransactionId by_txn_last_xid
void pfree(void *pointer)
Definition: mcxt.c:1056
#define InvalidTransactionId
Definition: transam.h:31
ReorderBufferTXN * by_txn_last_txn
TransactionId xid
SharedInvalidationMessage * invalidations

◆ ReorderBufferSerializeChange()

static void ReorderBufferSerializeChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  fd,
ReorderBufferChange change 
)
static

Definition at line 2500 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, CloseTransientFile(), ReorderBufferChange::data, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferChange::lsn, ReorderBufferChange::msg, ReorderBuffer::outbuf, pgstat_report_wait_end(), pgstat_report_wait_start(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTupleBuf::tuple, WAIT_EVENT_REORDER_BUFFER_WRITE, write, SnapshotData::xcnt, ReorderBufferTXN::xid, and SnapshotData::xip.

Referenced by ReorderBufferSerializeTXN().

2502 {
2503  ReorderBufferDiskChange *ondisk;
2504  Size sz = sizeof(ReorderBufferDiskChange);
2505 
2507 
2508  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2509  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
2510 
2511  switch (change->action)
2512  {
2513  /* fall through these, they're all similar enough */
2518  {
2519  char *data;
2520  ReorderBufferTupleBuf *oldtup,
2521  *newtup;
2522  Size oldlen = 0;
2523  Size newlen = 0;
2524 
2525  oldtup = change->data.tp.oldtuple;
2526  newtup = change->data.tp.newtuple;
2527 
2528  if (oldtup)
2529  {
2530  sz += sizeof(HeapTupleData);
2531  oldlen = oldtup->tuple.t_len;
2532  sz += oldlen;
2533  }
2534 
2535  if (newtup)
2536  {
2537  sz += sizeof(HeapTupleData);
2538  newlen = newtup->tuple.t_len;
2539  sz += newlen;
2540  }
2541 
2542  /* make sure we have enough space */
2544 
2545  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2546  /* might have been reallocated above */
2547  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2548 
2549  if (oldlen)
2550  {
2551  memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
2552  data += sizeof(HeapTupleData);
2553 
2554  memcpy(data, oldtup->tuple.t_data, oldlen);
2555  data += oldlen;
2556  }
2557 
2558  if (newlen)
2559  {
2560  memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
2561  data += sizeof(HeapTupleData);
2562 
2563  memcpy(data, newtup->tuple.t_data, newlen);
2564  data += newlen;
2565  }
2566  break;
2567  }
2569  {
2570  char *data;
2571  Size prefix_size = strlen(change->data.msg.prefix) + 1;
2572 
2573  sz += prefix_size + change->data.msg.message_size +
2574  sizeof(Size) + sizeof(Size);
2576 
2577  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2578 
2579  /* might have been reallocated above */
2580  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2581 
2582  /* write the prefix including the size */
2583  memcpy(data, &prefix_size, sizeof(Size));
2584  data += sizeof(Size);
2585  memcpy(data, change->data.msg.prefix,
2586  prefix_size);
2587  data += prefix_size;
2588 
2589  /* write the message including the size */
2590  memcpy(data, &change->data.msg.message_size, sizeof(Size));
2591  data += sizeof(Size);
2592  memcpy(data, change->data.msg.message,
2593  change->data.msg.message_size);
2594  data += change->data.msg.message_size;
2595 
2596  break;
2597  }
2599  {
2600  Snapshot snap;
2601  char *data;
2602 
2603  snap = change->data.snapshot;
2604 
2605  sz += sizeof(SnapshotData) +
2606  sizeof(TransactionId) * snap->xcnt +
2607  sizeof(TransactionId) * snap->subxcnt;
2608 
2609  /* make sure we have enough space */
2611  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2612  /* might have been reallocated above */
2613  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2614 
2615  memcpy(data, snap, sizeof(SnapshotData));
2616  data += sizeof(SnapshotData);
2617 
2618  if (snap->xcnt)
2619  {
2620  memcpy(data, snap->xip,
2621  sizeof(TransactionId) * snap->xcnt);
2622  data += sizeof(TransactionId) * snap->xcnt;
2623  }
2624 
2625  if (snap->subxcnt)
2626  {
2627  memcpy(data, snap->subxip,
2628  sizeof(TransactionId) * snap->subxcnt);
2629  data += sizeof(TransactionId) * snap->subxcnt;
2630  }
2631  break;
2632  }
2634  {
2635  Size size;
2636  char *data;
2637 
2638  /* account for the OIDs of truncated relations */
2639  size = sizeof(Oid) * change->data.truncate.nrelids;
2640  sz += size;
2641 
2642  /* make sure we have enough space */
2644 
2645  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2646  /* might have been reallocated above */
2647  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2648 
2649  memcpy(data, change->data.truncate.relids, size);
2650  data += size;
2651 
2652  break;
2653  }
2657  /* ReorderBufferChange contains everything important */
2658  break;
2659  }
2660 
2661  ondisk->size = sz;
2662 
2663  errno = 0;
2665  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
2666  {
2667  int save_errno = errno;
2668 
2670 
2671  /* if write didn't set errno, assume problem is no disk space */
2672  errno = save_errno ? save_errno : ENOSPC;
2673  ereport(ERROR,
2675  errmsg("could not write to data file for XID %u: %m",
2676  txn->xid)));
2677  }
2679 
2680  /*
2681  * Keep the transaction's final_lsn up to date with each change we send to
2682  * disk, so that ReorderBufferRestoreCleanup works correctly. (We used to
2683  * only do this on commit and abort records, but that doesn't work if a
2684  * system crash leaves a transaction without its abort record).
2685  *
2686  * Make sure not to move it backwards.
2687  */
2688  if (txn->final_lsn < change->lsn)
2689  txn->final_lsn = change->lsn;
2690 
2691  Assert(ondisk->change.action == change->action);
2692 }
struct ReorderBufferChange::@99::@101 truncate
uint32 TransactionId
Definition: c.h:513
#define write(a, b, c)
Definition: win32.h:14
struct ReorderBufferChange::@99::@102 msg
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
static int fd(const char *x, int i)
Definition: preproc-init.c:105
HeapTupleHeader t_data
Definition: htup.h:68
#define ERROR
Definition: elog.h:43
uint32 t_len
Definition: htup.h:64
int errcode_for_file_access(void)
Definition: elog.c:633
HeapTupleData tuple
Definition: reorderbuffer.h:29
struct SnapshotData SnapshotData
XLogRecPtr final_lsn
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1381
TransactionId * xip
Definition: snapshot.h:168
int CloseTransientFile(int fd)
Definition: fd.c:2549
union ReorderBufferChange::@99 data
TransactionId xid
#define ereport(elevel,...)
Definition: elog.h:144
struct ReorderBufferDiskChange ReorderBufferDiskChange
struct ReorderBufferChange::@99::@100 tp
#define Assert(condition)
Definition: c.h:738
size_t Size
Definition: c.h:466
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1357
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
uint32 xcnt
Definition: snapshot.h:169
int errmsg(const char *fmt,...)
Definition: elog.c:824
ReorderBufferChange change
struct HeapTupleData HeapTupleData
TransactionId * subxip
Definition: snapshot.h:180
int32 subxcnt
Definition: snapshot.h:181

◆ ReorderBufferSerializedPath()

static void ReorderBufferSerializedPath ( char *  path,
ReplicationSlot slot,
TransactionId  xid,
XLogSegNo  segno 
)
static

Definition at line 3139 of file reorderbuffer.c.

References ReplicationSlot::data, MAXPGPATH, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, snprintf, wal_segment_size, and XLogSegNoOffsetToRecPtr.

Referenced by ReorderBufferRestoreChanges(), ReorderBufferRestoreCleanup(), and ReorderBufferSerializeTXN().

3141 {
3142  XLogRecPtr recptr;
3143 
3144  XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
3145 
3146  snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.spill",
3148  xid,
3149  (uint32) (recptr >> 32), (uint32) recptr);
3150 }
int wal_segment_size
Definition: xlog.c:116
ReplicationSlotPersistentData data
Definition: slot.h:143
#define MAXPGPATH
unsigned int uint32
Definition: c.h:367
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define NameStr(name)
Definition: c.h:615
#define snprintf
Definition: port.h:193

◆ ReorderBufferSerializeReserve()

static void ReorderBufferSerializeReserve ( ReorderBuffer rb,
Size  sz 
)
static

Definition at line 2311 of file reorderbuffer.c.

References ReorderBuffer::context, MemoryContextAlloc(), ReorderBuffer::outbuf, ReorderBuffer::outbufsize, and repalloc().

Referenced by ReorderBufferRestoreChanges(), and ReorderBufferSerializeChange().

2312 {
2313  if (!rb->outbufsize)
2314  {
2315  rb->outbuf = MemoryContextAlloc(rb->context, sz);
2316  rb->outbufsize = sz;
2317  }
2318  else if (rb->outbufsize < sz)
2319  {
2320  rb->outbuf = repalloc(rb->outbuf, sz);
2321  rb->outbufsize = sz;
2322  }
2323 }
MemoryContext context
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1069
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796

◆ ReorderBufferSerializeTXN()

static void ReorderBufferSerializeTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2414 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, CloseTransientFile(), dlist_iter::cur, dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_delete(), dlist_foreach, dlist_foreach_modify, dlist_is_empty(), elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferChange::lsn, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, OpenTransientFile(), PG_BINARY, RBTXN_IS_SERIALIZED, rbtxn_is_serialized, ReorderBufferReturnChange(), ReorderBufferSerializeChange(), ReorderBufferSerializedPath(), ReorderBufferTXN::size, ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBufferTXN::subtxns, ReorderBufferTXN::txn_flags, wal_segment_size, ReorderBufferTXN::xid, XLByteInSeg, and XLByteToSeg.

Referenced by ReorderBufferCheckMemoryLimit(), and ReorderBufferIterTXNInit().

2415 {
2416  dlist_iter subtxn_i;
2417  dlist_mutable_iter change_i;
2418  int fd = -1;
2419  XLogSegNo curOpenSegNo = 0;
2420  Size spilled = 0;
2421  Size size = txn->size;
2422 
2423  elog(DEBUG2, "spill %u changes in XID %u to disk",
2424  (uint32) txn->nentries_mem, txn->xid);
2425 
2426  /* do the same to all child TXs */
2427  dlist_foreach(subtxn_i, &txn->subtxns)
2428  {
2429  ReorderBufferTXN *subtxn;
2430 
2431  subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
2432  ReorderBufferSerializeTXN(rb, subtxn);
2433  }
2434 
2435  /* serialize changestream */
2436  dlist_foreach_modify(change_i, &txn->changes)
2437  {
2438  ReorderBufferChange *change;
2439 
2440  change = dlist_container(ReorderBufferChange, node, change_i.cur);
2441 
2442  /*
2443  * store in segment in which it belongs by start lsn, don't split over
2444  * multiple segments tho
2445  */
2446  if (fd == -1 ||
2447  !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
2448  {
2449  char path[MAXPGPATH];
2450 
2451  if (fd != -1)
2452  CloseTransientFile(fd);
2453 
2454  XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
2455 
2456  /*
2457  * No need to care about TLIs here, only used during a single run,
2458  * so each LSN only maps to a specific WAL record.
2459  */
2461  curOpenSegNo);
2462 
2463  /* open segment, create it if necessary */
2464  fd = OpenTransientFile(path,
2465  O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
2466 
2467  if (fd < 0)
2468  ereport(ERROR,
2470  errmsg("could not open file \"%s\": %m", path)));
2471  }
2472 
2473  ReorderBufferSerializeChange(rb, txn, fd, change);
2474  dlist_delete(&change->node);
2475  ReorderBufferReturnChange(rb, change);
2476 
2477  spilled++;
2478  }
2479 
2480  /* update the statistics */
2481  rb->spillCount += 1;
2482  rb->spillBytes += size;
2483 
2484  /* Don't consider already serialized transactions. */
2485  rb->spillTxns += rbtxn_is_serialized(txn) ? 0 : 1;
2486 
2487  Assert(spilled == txn->nentries_mem);
2488  Assert(dlist_is_empty(&txn->changes));
2489  txn->nentries_mem = 0;
2491 
2492  if (fd != -1)
2493  CloseTransientFile(fd);
2494 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
dlist_node * cur
Definition: ilist.h:180
int wal_segment_size
Definition: xlog.c:116
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1233
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2372
dlist_head changes
#define MAXPGPATH
#define DEBUG2
Definition: elog.h:24
uint64 XLogSegNo
Definition: xlogdefs.h:41
int errcode_for_file_access(void)
Definition: elog.c:633
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
unsigned int uint32
Definition: c.h:367
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
int CloseTransientFile(int fd)
Definition: fd.c:2549
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
dlist_node * cur
Definition: ilist.h:161
#define ereport(elevel,...)
Definition: elog.h:144
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:738
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
size_t Size
Definition: c.h:466
dlist_head subtxns
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define RBTXN_IS_SERIALIZED
#define elog(elevel,...)
Definition: elog.h:214
#define rbtxn_is_serialized(txn)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ ReorderBufferSetBaseSnapshot()

void ReorderBufferSetBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 2096 of file reorderbuffer.c.

References Assert, AssertArg, AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBufferTXN::base_snapshot_node, dlist_push_tail(), InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), ReorderBufferTXN::toplevel_xid, ReorderBufferTXNByIdEnt::txn, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by SnapBuildCommitTxn(), and SnapBuildProcessChange().

2098 {
2099  ReorderBufferTXN *txn;
2100  bool is_new;
2101 
2102  AssertArg(snap != NULL);
2103 
2104  /*
2105  * Fetch the transaction to operate on. If we know it's a subtransaction,
2106  * operate on its top-level transaction instead.
2107  */
2108  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
2109  if (rbtxn_is_known_subxact(txn))
2110  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
2111  NULL, InvalidXLogRecPtr, false);
2112  Assert(txn->base_snapshot == NULL);
2113 
2114  txn->base_snapshot = snap;
2115  txn->base_snapshot_lsn = lsn;
2117 
2118  AssertTXNLsnOrder(rb);
2119 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
dlist_node base_snapshot_node
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
XLogRecPtr base_snapshot_lsn
dlist_head txns_by_base_snapshot_lsn
#define AssertArg(condition)
Definition: c.h:740
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:738
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_known_subxact(txn)
TransactionId toplevel_xid

◆ ReorderBufferSetRestartPoint()

void ReorderBufferSetRestartPoint ( ReorderBuffer rb,
XLogRecPtr  ptr 
)

Definition at line 818 of file reorderbuffer.c.

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

819 {
821 }
XLogRecPtr current_restart_decoding_lsn

◆ ReorderBufferToastAppendChunk()

static void ReorderBufferToastAppendChunk ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 3212 of file reorderbuffer.c.

References Assert, ReorderBufferToastEnt::chunk_id, ReorderBufferToastEnt::chunks, ReorderBufferChange::data, DatumGetInt32, DatumGetObjectId, DatumGetPointer, dlist_init(), dlist_push_tail(), elog, ERROR, fastgetattr, HASH_ENTER, hash_search(), IsToastRelation(), ReorderBufferToastEnt::last_chunk_seq, ReorderBufferChange::node, ReorderBufferToastEnt::num_chunks, ReorderBufferToastEnt::reconstructed, RelationGetDescr, ReorderBufferToastInitHash(), ReorderBufferToastEnt::size, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, ReorderBufferTupleBuf::tuple, VARATT_IS_EXTENDED, VARATT_IS_SHORT, VARHDRSZ, VARHDRSZ_SHORT, VARSIZE, and VARSIZE_SHORT.

Referenced by ReorderBufferCommit().

3214 {
3215  ReorderBufferToastEnt *ent;
3216  ReorderBufferTupleBuf *newtup;
3217  bool found;
3218  int32 chunksize;
3219  bool isnull;
3220  Pointer chunk;
3221  TupleDesc desc = RelationGetDescr(relation);
3222  Oid chunk_id;
3223  int32 chunk_seq;
3224 
3225  if (txn->toast_hash == NULL)
3226  ReorderBufferToastInitHash(rb, txn);
3227 
3228  Assert(IsToastRelation(relation));
3229 
3230  newtup = change->data.tp.newtuple;
3231  chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
3232  Assert(!isnull);
3233  chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
3234  Assert(!isnull);
3235 
3236  ent = (ReorderBufferToastEnt *)
3237  hash_search(txn->toast_hash,
3238  (void *) &chunk_id,
3239  HASH_ENTER,
3240  &found);
3241 
3242  if (!found)
3243  {
3244  Assert(ent->chunk_id == chunk_id);
3245  ent->num_chunks = 0;
3246  ent->last_chunk_seq = 0;
3247  ent->size = 0;
3248  ent->reconstructed = NULL;
3249  dlist_init(&ent->chunks);
3250 
3251  if (chunk_seq != 0)
3252  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
3253  chunk_seq, chunk_id);
3254  }
3255  else if (found && chunk_seq != ent->last_chunk_seq + 1)
3256  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
3257  chunk_seq, chunk_id, ent->last_chunk_seq + 1);
3258 
3259  chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
3260  Assert(!isnull);
3261 
3262  /* calculate size so we can allocate the right size at once later */
3263  if (!VARATT_IS_EXTENDED(chunk))
3264  chunksize = VARSIZE(chunk) - VARHDRSZ;
3265  else if (VARATT_IS_SHORT(chunk))
3266  /* could happen due to heap_form_tuple doing its thing */
3267  chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
3268  else
3269  elog(ERROR, "unexpected type of toast chunk");
3270 
3271  ent->size += chunksize;
3272  ent->last_chunk_seq = chunk_seq;
3273  ent->num_chunks++;
3274  dlist_push_tail(&ent->chunks, &change->node);
3275 }
bool IsToastRelation(Relation relation)
Definition: catalog.c:140
#define fastgetattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:712
#define DatumGetInt32(X)
Definition: postgres.h:472
#define RelationGetDescr(relation)
Definition: rel.h:482
#define VARHDRSZ_SHORT
Definition: postgres.h:268
#define VARSIZE(PTR)
Definition: postgres.h:303
#define VARHDRSZ
Definition: c.h:561
#define DatumGetObjectId(X)
Definition: postgres.h:500
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:908
unsigned int Oid
Definition: postgres_ext.h:31
signed int int32
Definition: c.h:355
char * Pointer
Definition: c.h:344
#define ERROR
Definition: elog.h:43
#define VARATT_IS_SHORT(PTR)
Definition: postgres.h:326
struct varlena * reconstructed
HeapTupleData tuple
Definition: reorderbuffer.h:29
#define VARSIZE_SHORT(PTR)
Definition: postgres.h:305
union ReorderBufferChange::@99 data
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
struct ReorderBufferChange::@99::@100 tp
#define Assert(condition)
Definition: c.h:738
#define VARATT_IS_EXTENDED(PTR)
Definition: postgres.h:327
#define DatumGetPointer(X)
Definition: postgres.h:549
#define elog(elevel,...)
Definition: elog.h:214
static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)

◆ ReorderBufferToastInitHash()

static void ReorderBufferToastInitHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 3191 of file reorderbuffer.c.

References Assert, ReorderBuffer::context, HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, and ReorderBufferTXN::toast_hash.

Referenced by ReorderBufferToastAppendChunk().

3192 {
3193  HASHCTL hash_ctl;
3194 
3195  Assert(txn->toast_hash == NULL);
3196 
3197  memset(&hash_ctl, 0, sizeof(hash_ctl));
3198  hash_ctl.keysize = sizeof(Oid);
3199  hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
3200  hash_ctl.hcxt = rb->context;
3201  txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
3203 }
struct ReorderBufferToastEnt ReorderBufferToastEnt
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
Size entrysize
Definition: hsearch.h:73
unsigned int Oid
Definition: postgres_ext.h:31
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext context
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:318
Size keysize
Definition: hsearch.h:72
#define Assert(condition)
Definition: c.h:738

◆ ReorderBufferToastReplace()

static void ReorderBufferToastReplace ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 3298 of file reorderbuffer.c.

References Assert, ReorderBufferToastEnt::chunks, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, DatumGetPointer, dlist_container, dlist_foreach, elog, ERROR, fastgetattr, free, HASH_FIND, hash_search(), heap_deform_tuple(), heap_form_tuple(), INDIRECT_POINTER_SIZE, MaxHeapTupleSize, MemoryContextSwitchTo(), TupleDescData::natts, palloc0(), pfree(), varatt_indirect::pointer, PointerGetDatum, RelationData::rd_rel, ReorderBufferToastEnt::reconstructed, RelationClose(), RelationGetDescr, RelationIdGetRelation(), RelationIsValid, ReorderBufferChangeMemoryUpdate(), ReorderBufferTupleBufData, SET_VARSIZE, SET_VARSIZE_COMPRESSED, SET_VARTAG_EXTERNAL, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, ReorderBufferTupleBuf::tuple, TupleDescAttr, varatt_external::va_extsize, varatt_external::va_rawsize, varatt_external::va_valueid, VARATT_EXTERNAL_GET_POINTER, VARATT_EXTERNAL_IS_COMPRESSED, VARATT_IS_EXTERNAL, VARATT_IS_SHORT, VARDATA, VARDATA_EXTERNAL, VARHDRSZ, VARSIZE, and VARTAG_INDIRECT.

Referenced by ReorderBufferCommit().

3300 {
3301  TupleDesc desc;
3302  int natt;
3303  Datum *attrs;
3304  bool *isnull;
3305  bool *free;
3306  HeapTuple tmphtup;
3307  Relation toast_rel;
3308  TupleDesc toast_desc;
3309  MemoryContext oldcontext;
3310  ReorderBufferTupleBuf *newtup;
3311 
3312  /* no toast tuples changed */
3313  if (txn->toast_hash == NULL)
3314  return;
3315 
3316  /*
3317  * We're going to modify the size of the change, so to make sure the
3318  * accounting is correct we'll make it look like we're removing the change
3319  * now (with the old size), and then re-add it at the end.
3320  */
3321  ReorderBufferChangeMemoryUpdate(rb, change, false);
3322 
3323  oldcontext = MemoryContextSwitchTo(rb->context);
3324 
3325  /* we should only have toast tuples in an INSERT or UPDATE */
3326  Assert(change->data.tp.newtuple);
3327 
3328  desc = RelationGetDescr(relation);
3329 
3330  toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
3331  if (!RelationIsValid(toast_rel))
3332  elog(ERROR, "could not open relation with OID %u",
3333  relation->rd_rel->reltoastrelid);
3334 
3335  toast_desc = RelationGetDescr(toast_rel);
3336 
3337  /* should we allocate from stack instead? */
3338  attrs = palloc0(sizeof(Datum) * desc->natts);
3339  isnull = palloc0(sizeof(bool) * desc->natts);
3340  free = palloc0(sizeof(bool) * desc->natts);
3341 
3342  newtup = change->data.tp.newtuple;
3343 
3344  heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
3345 
3346  for (natt = 0; natt < desc->natts; natt++)
3347  {
3348  Form_pg_attribute attr = TupleDescAttr(desc, natt);
3349  ReorderBufferToastEnt *ent;
3350  struct varlena *varlena;
3351 
3352  /* va_rawsize is the size of the original datum -- including header */
3353  struct varatt_external toast_pointer;
3354  struct varatt_indirect redirect_pointer;
3355  struct varlena *new_datum = NULL;
3356  struct varlena *reconstructed;
3357  dlist_iter it;
3358  Size data_done = 0;
3359 
3360  /* system columns aren't toasted */
3361  if (attr->attnum < 0)
3362  continue;
3363 
3364  if (attr->attisdropped)
3365  continue;
3366 
3367  /* not a varlena datatype */
3368  if (attr->attlen != -1)
3369  continue;
3370 
3371  /* no data */
3372  if (isnull[natt])
3373  continue;
3374 
3375  /* ok, we know we have a toast datum */
3376  varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
3377 
3378  /* no need to do anything if the tuple isn't external */
3379  if (!VARATT_IS_EXTERNAL(varlena))
3380  continue;
3381 
3382  VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
3383 
3384  /*
3385  * Check whether the toast tuple changed, replace if so.
3386  */
3387  ent = (ReorderBufferToastEnt *)
3388  hash_search(txn->toast_hash,
3389  (void *) &toast_pointer.va_valueid,
3390  HASH_FIND,
3391  NULL);
3392  if (ent == NULL)
3393  continue;
3394 
3395  new_datum =
3396  (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
3397 
3398  free[natt] = true;
3399 
3400  reconstructed = palloc0(toast_pointer.va_rawsize);
3401 
3402  ent->reconstructed = reconstructed;
3403 
3404  /* stitch toast tuple back together from its parts */
3405  dlist_foreach(it, &ent->chunks)
3406  {
3407  bool isnull;
3408  ReorderBufferChange *cchange;
3409  ReorderBufferTupleBuf *ctup;
3410  Pointer chunk;
3411 
3412  cchange = dlist_container(ReorderBufferChange, node, it.cur);
3413  ctup = cchange->data.tp.newtuple;
3414  chunk = DatumGetPointer(fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
3415 
3416  Assert(!isnull);
3417  Assert(!VARATT_IS_EXTERNAL(chunk));
3418  Assert(!VARATT_IS_SHORT(chunk));
3419 
3420  memcpy(VARDATA(reconstructed) + data_done,
3421  VARDATA(chunk),
3422  VARSIZE(chunk) - VARHDRSZ);
3423  data_done += VARSIZE(chunk) - VARHDRSZ;
3424  }
3425  Assert(data_done == toast_pointer.va_extsize);
3426 
3427  /* make sure its marked as compressed or not */
3428  if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
3429  SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
3430  else
3431  SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
3432 
3433  memset(&redirect_pointer, 0, sizeof(redirect_pointer));
3434  redirect_pointer.pointer = reconstructed;
3435 
3437  memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
3438  sizeof(redirect_pointer));
3439 
3440  attrs[natt] = PointerGetDatum(new_datum);
3441  }
3442 
3443  /*
3444  * Build tuple in separate memory & copy tuple back into the tuplebuf
3445  * passed to the output plugin. We can't directly heap_fill_tuple() into
3446  * the tuplebuf because attrs[] will point back into the current content.
3447  */
3448  tmphtup = heap_form_tuple(desc, attrs, isnull);
3449  Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
3450  Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
3451 
3452  memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
3453  newtup->tuple.t_len = tmphtup->t_len;
3454 
3455  /*
3456  * free resources we won't further need, more persistent stuff will be
3457  * free'd in ReorderBufferToastReset().
3458  */
3459  RelationClose(toast_rel);
3460  pfree(tmphtup);
3461  for (natt = 0; natt < desc->natts; natt++)
3462  {
3463  if (free[natt])
3464  pfree(DatumGetPointer(attrs[natt]));
3465  }
3466  pfree(attrs);
3467  pfree(free);
3468  pfree(isnull);
3469 
3470  MemoryContextSwitchTo(oldcontext);
3471 
3472  /* now add the change back, with the co