PostgreSQL Source Code  git master
reorderbuffer.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/detoast.h"
#include "access/heapam.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/combocid.h"
#include "utils/memdebug.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenodemap.h"
Include dependency graph for reorderbuffer.c:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTXNByIdEnt
 
struct  ReorderBufferTupleCidKey
 
struct  ReorderBufferTupleCidEnt
 
struct  ReorderBufferIterTXNEntry
 
struct  ReorderBufferIterTXNState
 
struct  ReorderBufferToastEnt
 
struct  ReorderBufferDiskChange
 
struct  RewriteMappingFile
 

Typedefs

typedef struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
 
typedef struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
 
typedef struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
 
typedef struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
 
typedef struct ReorderBufferIterTXNState ReorderBufferIterTXNState
 
typedef struct ReorderBufferToastEnt ReorderBufferToastEnt
 
typedef struct ReorderBufferDiskChange ReorderBufferDiskChange
 
typedef struct RewriteMappingFile RewriteMappingFile
 

Functions

static ReorderBufferTXNReorderBufferGetTXN (ReorderBuffer *rb)
 
static void ReorderBufferReturnTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferTXNReorderBufferTXNByXid (ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
 
static void ReorderBufferTransferSnapToParent (ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
 
static void AssertTXNLsnOrder (ReorderBuffer *rb)
 
static ReorderBufferIterTXNStateReorderBufferIterTXNInit (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferChangeReorderBufferIterTXNNext (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferIterTXNFinish (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferExecuteInvalidations (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferCheckMemoryLimit (ReorderBuffer *rb)
 
static void ReorderBufferSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeChange (ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
 
static Size ReorderBufferRestoreChanges (ReorderBuffer *rb, ReorderBufferTXN *txn, int *fd, XLogSegNo *segno)
 
static void ReorderBufferRestoreChange (ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
 
static void ReorderBufferRestoreCleanup (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferCleanupSerializedTXNs (const char *slotname)
 
static void ReorderBufferSerializedPath (char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
 
static void ReorderBufferFreeSnap (ReorderBuffer *rb, Snapshot snap)
 
static Snapshot ReorderBufferCopySnap (ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
 
static void ReorderBufferToastInitHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReset (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReplace (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void ReorderBufferToastAppendChunk (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static Size ReorderBufferChangeSize (ReorderBufferChange *change)
 
static void ReorderBufferChangeMemoryUpdate (ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
 
ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *rb)
 
void ReorderBufferReturnChange (ReorderBuffer *rb, ReorderBufferChange *change)
 
ReorderBufferTupleBufReorderBufferGetTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
 
OidReorderBufferGetRelids (ReorderBuffer *rb, int nrelids)
 
void ReorderBufferReturnRelids (ReorderBuffer *rb, Oid *relids)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
TransactionId ReorderBufferGetOldestXmin (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
static int ReorderBufferIterCompare (Datum a, Datum b, void *arg)
 
static void ReorderBufferCleanupTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferBuildTupleCidHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
static void ReorderBufferSerializeReserve (ReorderBuffer *rb, Size sz)
 
static ReorderBufferTXNReorderBufferLargestTXN (ReorderBuffer *rb)
 
void StartupReorderBuffer (void)
 
static void ApplyLogicalMappingFile (HTAB *tuplecid_data, Oid relid, const char *fname)
 
static bool TransactionIdInArray (TransactionId xid, TransactionId *xip, Size num)
 
static int file_sort_by_lsn (const ListCell *a_p, const ListCell *b_p)
 
static void UpdateLogicalMappings (HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
 
bool ResolveCminCmaxDuringDecoding (HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
 

Variables

int logical_decoding_work_mem
 
static const Size max_changes_in_memory = 4096
 

Typedef Documentation

◆ ReorderBufferDiskChange

◆ ReorderBufferIterTXNEntry

◆ ReorderBufferIterTXNState

◆ ReorderBufferToastEnt

◆ ReorderBufferTupleCidEnt

◆ ReorderBufferTupleCidKey

◆ ReorderBufferTXNByIdEnt

◆ RewriteMappingFile

Function Documentation

◆ ApplyLogicalMappingFile()

static void ApplyLogicalMappingFile ( HTAB tuplecid_data,
Oid  relid,
const char *  fname 
)
static

Definition at line 3561 of file reorderbuffer.c.

References Assert, CloseTransientFile(), ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), HASH_ENTER, HASH_FIND, hash_search(), InvalidCommandId, ItemPointerCopy, sort-test::key, MAXPGPATH, LogicalRewriteMappingData::new_node, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_node, LogicalRewriteMappingData::old_tid, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, ReorderBufferTupleCidKey::relnode, sprintf, ReorderBufferTupleCidKey::tid, and WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ.

Referenced by UpdateLogicalMappings().

3562 {
3563  char path[MAXPGPATH];
3564  int fd;
3565  int readBytes;
3567 
3568  sprintf(path, "pg_logical/mappings/%s", fname);
3569  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
3570  if (fd < 0)
3571  ereport(ERROR,
3573  errmsg("could not open file \"%s\": %m", path)));
3574 
3575  while (true)
3576  {
3579  ReorderBufferTupleCidEnt *new_ent;
3580  bool found;
3581 
3582  /* be careful about padding */
3583  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
3584 
3585  /* read all mappings till the end of the file */
3587  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
3589 
3590  if (readBytes < 0)
3591  ereport(ERROR,
3593  errmsg("could not read file \"%s\": %m",
3594  path)));
3595  else if (readBytes == 0) /* EOF */
3596  break;
3597  else if (readBytes != sizeof(LogicalRewriteMappingData))
3598  ereport(ERROR,
3600  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
3601  path, readBytes,
3602  (int32) sizeof(LogicalRewriteMappingData))));
3603 
3604  key.relnode = map.old_node;
3605  ItemPointerCopy(&map.old_tid,
3606  &key.tid);
3607 
3608 
3609  ent = (ReorderBufferTupleCidEnt *)
3610  hash_search(tuplecid_data,
3611  (void *) &key,
3612  HASH_FIND,
3613  NULL);
3614 
3615  /* no existing mapping, no need to update */
3616  if (!ent)
3617  continue;
3618 
3619  key.relnode = map.new_node;
3620  ItemPointerCopy(&map.new_tid,
3621  &key.tid);
3622 
3623  new_ent = (ReorderBufferTupleCidEnt *)
3624  hash_search(tuplecid_data,
3625  (void *) &key,
3626  HASH_ENTER,
3627  &found);
3628 
3629  if (found)
3630  {
3631  /*
3632  * Make sure the existing mapping makes sense. We sometime update
3633  * old records that did not yet have a cmax (e.g. pg_class' own
3634  * entry while rewriting it) during rewrites, so allow that.
3635  */
3636  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
3637  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
3638  }
3639  else
3640  {
3641  /* update mapping */
3642  new_ent->cmin = ent->cmin;
3643  new_ent->cmax = ent->cmax;
3644  new_ent->combocid = ent->combocid;
3645  }
3646  }
3647 
3648  if (CloseTransientFile(fd) != 0)
3649  ereport(ERROR,
3651  errmsg("could not close file \"%s\": %m", path)));
3652 }
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1222
signed int int32
Definition: c.h:347
#define sprintf
Definition: port.h:194
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2292
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:631
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1342
#define ereport(elevel, rest)
Definition: elog.h:141
int CloseTransientFile(int fd)
Definition: fd.c:2469
#define InvalidCommandId
Definition: c.h:531
ItemPointerData new_tid
Definition: rewriteheap.h:40
#define Assert(condition)
Definition: c.h:739
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1318
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define read(a, b, c)
Definition: win32.h:13
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:161
ItemPointerData old_tid
Definition: rewriteheap.h:39

◆ AssertTXNLsnOrder()

static void AssertTXNLsnOrder ( ReorderBuffer rb)
static

Definition at line 710 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferAssignChild(), ReorderBufferGetOldestTXN(), ReorderBufferGetOldestXmin(), ReorderBufferSetBaseSnapshot(), and ReorderBufferTXNByXid().

711 {
712 #ifdef USE_ASSERT_CHECKING
713  dlist_iter iter;
714  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
715  XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
716 
717  dlist_foreach(iter, &rb->toplevel_by_lsn)
718  {
720  iter.cur);
721 
722  /* start LSN must be set */
723  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
724 
725  /* If there is an end LSN, it must be higher than start LSN */
726  if (cur_txn->end_lsn != InvalidXLogRecPtr)
727  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
728 
729  /* Current initial LSN must be strictly higher than previous */
730  if (prev_first_lsn != InvalidXLogRecPtr)
731  Assert(prev_first_lsn < cur_txn->first_lsn);
732 
733  /* known-as-subtxn txns must not be listed */
734  Assert(!cur_txn->is_known_as_subxact);
735 
736  prev_first_lsn = cur_txn->first_lsn;
737  }
738 
740  {
742  base_snapshot_node,
743  iter.cur);
744 
745  /* base snapshot (and its LSN) must be set */
746  Assert(cur_txn->base_snapshot != NULL);
748 
749  /* current LSN must be strictly higher than previous */
750  if (prev_base_snap_lsn != InvalidXLogRecPtr)
751  Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
752 
753  /* known-as-subtxn txns must not be listed */
754  Assert(!cur_txn->is_known_as_subxact);
755 
756  prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
757  }
758 #endif
759 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
XLogRecPtr base_snapshot_lsn
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head txns_by_base_snapshot_lsn
dlist_head toplevel_by_lsn
dlist_node * cur
Definition: ilist.h:161
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:739
XLogRecPtr end_lsn

◆ file_sort_by_lsn()

static int file_sort_by_lsn ( const ListCell a_p,
const ListCell b_p 
)
static

Definition at line 3669 of file reorderbuffer.c.

References lfirst, and RewriteMappingFile::lsn.

Referenced by UpdateLogicalMappings().

3670 {
3673 
3674  if (a->lsn < b->lsn)
3675  return -1;
3676  else if (a->lsn > b->lsn)
3677  return 1;
3678  return 0;
3679 }
#define lfirst(lc)
Definition: pg_list.h:190

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1914 of file reorderbuffer.c.

References ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferCleanupTXN(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeAbort().

1915 {
1916  ReorderBufferTXN *txn;
1917 
1918  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1919  false);
1920 
1921  /* unknown, nothing to remove */
1922  if (txn == NULL)
1923  return;
1924 
1925  /* cosmetic... */
1926  txn->final_lsn = lsn;
1927 
1928  /* remove potential on-disk data, and deallocate */
1929  ReorderBufferCleanupTXN(rb, txn);
1930 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)

Definition at line 1940 of file reorderbuffer.c.

References ReorderBufferTXN::changes, dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, dlist_tail_element, elog, ReorderBufferTXN::final_lsn, ReorderBufferChange::lsn, ReorderBufferCleanupTXN(), ReorderBufferTXN::serialized, ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::xid.

Referenced by DecodeStandbyOp().

1941 {
1942  dlist_mutable_iter it;
1943 
1944  /*
1945  * Iterate through all (potential) toplevel TXNs and abort all that are
1946  * older than what possibly can be running. Once we've found the first
1947  * that is alive we stop, there might be some that acquired an xid earlier
1948  * but started writing later, but it's unlikely and they will be cleaned
1949  * up in a later call to this function.
1950  */
1952  {
1953  ReorderBufferTXN *txn;
1954 
1955  txn = dlist_container(ReorderBufferTXN, node, it.cur);
1956 
1957  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1958  {
1959  /*
1960  * We set final_lsn on a transaction when we decode its commit or
1961  * abort record, but we never see those records for crashed
1962  * transactions. To ensure cleanup of these transactions, set
1963  * final_lsn to that of their last change; this causes
1964  * ReorderBufferRestoreCleanup to do the right thing.
1965  */
1966  if (txn->serialized && txn->final_lsn == 0)
1967  {
1968  ReorderBufferChange *last =
1970 
1971  txn->final_lsn = last->lsn;
1972  }
1973 
1974  elog(DEBUG2, "aborting old transaction %u", txn->xid);
1975 
1976  /* remove potential on-disk data, and deallocate this tx */
1977  ReorderBufferCleanupTXN(rb, txn);
1978  }
1979  else
1980  return;
1981  }
1982 }
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:496
dlist_head changes
#define DEBUG2
Definition: elog.h:24
XLogRecPtr final_lsn
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
dlist_head toplevel_by_lsn
TransactionId xid
#define elog(elevel,...)
Definition: elog.h:228

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 2216 of file reorderbuffer.c.

References Assert, ReorderBuffer::context, elog, ERROR, ReorderBufferTXN::invalidations, MemoryContextAlloc(), ReorderBufferTXN::ninvalidations, ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

2219 {
2220  ReorderBufferTXN *txn;
2221 
2222  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2223 
2224  if (txn->ninvalidations != 0)
2225  elog(ERROR, "only ever add one set of invalidations");
2226 
2227  Assert(nmsgs > 0);
2228 
2229  txn->ninvalidations = nmsgs;
2232  sizeof(SharedInvalidationMessage) * nmsgs);
2233  memcpy(txn->invalidations, msgs,
2234  sizeof(SharedInvalidationMessage) * nmsgs);
2235 }
#define ERROR
Definition: elog.h:43
MemoryContext context
#define Assert(condition)
Definition: c.h:739
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
#define elog(elevel,...)
Definition: elog.h:228

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

Definition at line 2132 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::command_id, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, ReorderBufferGetChange(), and ReorderBufferQueueChange().

Referenced by SnapBuildProcessNewCid().

2134 {
2136 
2137  change->data.command_id = cid;
2139 
2140  ReorderBufferQueueChange(rb, xid, lsn, change);
2141 }
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
union ReorderBufferChange::@101 data
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileNode  node,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 2187 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessNewCid().

2191 {
2193  ReorderBufferTXN *txn;
2194 
2195  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2196 
2197  change->data.tuplecid.node = node;
2198  change->data.tuplecid.tid = tid;
2199  change->data.tuplecid.cmin = cmin;
2200  change->data.tuplecid.cmax = cmax;
2201  change->data.tuplecid.combocid = combocid;
2202  change->lsn = lsn;
2203  change->txn = txn;
2205 
2206  dlist_push_tail(&txn->tuplecids, &change->node);
2207  txn->ntuplecids++;
2208 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:86
struct ReorderBufferChange::@101::@105 tuplecid
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
union ReorderBufferChange::@101 data
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
dlist_head tuplecids

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 2083 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, ReorderBufferGetChange(), ReorderBufferQueueChange(), and ReorderBufferChange::snapshot.

Referenced by SnapBuildDistributeNewCatalogSnapshot().

2085 {
2087 
2088  change->data.snapshot = snap;
2090 
2091  ReorderBufferQueueChange(rb, xid, lsn, change);
2092 }
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
union ReorderBufferChange::@101 data
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)

◆ ReorderBufferAllocate()

ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 263 of file reorderbuffer.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, ReorderBufferCleanupSerializedTXNs(), ReorderBuffer::size, SLAB_DEFAULT_BLOCK_SIZE, SLAB_LARGE_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBuffer::toplevel_by_lsn, ReorderBuffer::tup_context, ReorderBuffer::txn_context, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

264 {
265  ReorderBuffer *buffer;
266  HASHCTL hash_ctl;
267  MemoryContext new_ctx;
268 
269  Assert(MyReplicationSlot != NULL);
270 
271  /* allocate memory in own context, to have better accountability */
273  "ReorderBuffer",
275 
276  buffer =
277  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
278 
279  memset(&hash_ctl, 0, sizeof(hash_ctl));
280 
281  buffer->context = new_ctx;
282 
283  buffer->change_context = SlabContextCreate(new_ctx,
284  "Change",
286  sizeof(ReorderBufferChange));
287 
288  buffer->txn_context = SlabContextCreate(new_ctx,
289  "TXN",
291  sizeof(ReorderBufferTXN));
292 
293  buffer->tup_context = GenerationContextCreate(new_ctx,
294  "Tuples",
296 
297  hash_ctl.keysize = sizeof(TransactionId);
298  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
299  hash_ctl.hcxt = buffer->context;
300 
301  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
303 
305  buffer->by_txn_last_txn = NULL;
306 
307  buffer->outbuf = NULL;
308  buffer->outbufsize = 0;
309  buffer->size = 0;
310 
311  buffer->spillCount = 0;
312  buffer->spillTxns = 0;
313  buffer->spillBytes = 0;
314 
316 
317  dlist_init(&buffer->toplevel_by_lsn);
319 
320  /*
321  * Ensure there's no stale data from prior uses of this slot, in case some
322  * prior exit avoided calling ReorderBufferFree. Failure to do this can
323  * produce duplicated txns, and it's very cheap if there's nothing there.
324  */
326 
327  return buffer;
328 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define AllocSetContextCreate
Definition: memutils.h:170
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
uint32 TransactionId
Definition: c.h:514
MemoryContext hcxt
Definition: hsearch.h:78
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
Size entrysize
Definition: hsearch.h:73
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:187
ReplicationSlotPersistentData data
Definition: slot.h:132
MemoryContext change_context
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:222
dlist_head txns_by_base_snapshot_lsn
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
#define InvalidTransactionId
Definition: transam.h:31
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size blockSize)
Definition: generation.c:212
MemoryContext context
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
ReorderBufferTXN * by_txn_last_txn
dlist_head toplevel_by_lsn
Size keysize
Definition: hsearch.h:72
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:739
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:221
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
#define NameStr(name)
Definition: c.h:616
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
MemoryContext tup_context
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext txn_context

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 821 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), elog, ERROR, ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, ReorderBufferTXN::toplevel_xid, ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXNByIdEnt::xid.

Referenced by DecodeXactOp(), and ReorderBufferCommitChild().

823 {
824  ReorderBufferTXN *txn;
825  ReorderBufferTXN *subtxn;
826  bool new_top;
827  bool new_sub;
828 
829  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
830  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
831 
832  if (new_top && !new_sub)
833  elog(ERROR, "subtransaction logged without previous top-level txn record");
834 
835  if (!new_sub)
836  {
837  if (subtxn->is_known_as_subxact)
838  {
839  /* already associated, nothing to do */
840  return;
841  }
842  else
843  {
844  /*
845  * We already saw this transaction, but initially added it to the
846  * list of top-level txns. Now that we know it's not top-level,
847  * remove it from there.
848  */
849  dlist_delete(&subtxn->node);
850  }
851  }
852 
853  subtxn->is_known_as_subxact = true;
854  subtxn->toplevel_xid = xid;
855  Assert(subtxn->nsubtxns == 0);
856 
857  /* add to subtransaction list */
858  dlist_push_tail(&txn->subtxns, &subtxn->node);
859  txn->nsubtxns++;
860 
861  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
863 
864  /* Verify LSN-ordering invariant */
865  AssertTXNLsnOrder(rb);
866 }
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
#define ERROR
Definition: elog.h:43
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:739
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define elog(elevel,...)
Definition: elog.h:228
TransactionId toplevel_xid

◆ ReorderBufferBuildTupleCidHash()

static void ReorderBufferBuildTupleCidHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1333 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, dlist_container, dlist_foreach, dlist_is_empty(), HASHCTL::entrysize, ReorderBufferTXN::has_catalog_changes, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FIND, hash_search(), HASHCTL::hcxt, InvalidCommandId, ItemPointerCopy, sort-test::key, HASHCTL::keysize, ReorderBufferTXN::ntuplecids, ReorderBufferTupleCidKey::relnode, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferTupleCidKey::tid, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::tuplecids.

Referenced by ReorderBufferCommit().

1334 {
1335  dlist_iter iter;
1336  HASHCTL hash_ctl;
1337 
1338  if (!txn->has_catalog_changes || dlist_is_empty(&txn->tuplecids))
1339  return;
1340 
1341  memset(&hash_ctl, 0, sizeof(hash_ctl));
1342 
1343  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1344  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1345  hash_ctl.hcxt = rb->context;
1346 
1347  /*
1348  * create the hash with the exact number of to-be-stored tuplecids from
1349  * the start
1350  */
1351  txn->tuplecid_hash =
1352  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1354 
1355  dlist_foreach(iter, &txn->tuplecids)
1356  {
1359  bool found;
1360  ReorderBufferChange *change;
1361 
1362  change = dlist_container(ReorderBufferChange, node, iter.cur);
1363 
1365 
1366  /* be careful about padding */
1367  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1368 
1369  key.relnode = change->data.tuplecid.node;
1370 
1371  ItemPointerCopy(&change->data.tuplecid.tid,
1372  &key.tid);
1373 
1374  ent = (ReorderBufferTupleCidEnt *)
1376  (void *) &key,
1378  &found);
1379  if (!found)
1380  {
1381  ent->cmin = change->data.tuplecid.cmin;
1382  ent->cmax = change->data.tuplecid.cmax;
1383  ent->combocid = change->data.tuplecid.combocid;
1384  }
1385  else
1386  {
1387  /*
1388  * Maybe we already saw this tuple before in this transaction, but
1389  * if so it must have the same cmin.
1390  */
1391  Assert(ent->cmin == change->data.tuplecid.cmin);
1392 
1393  /*
1394  * cmax may be initially invalid, but once set it can only grow,
1395  * and never become invalid again.
1396  */
1397  Assert((ent->cmax == InvalidCommandId) ||
1398  ((change->data.tuplecid.cmax != InvalidCommandId) &&
1399  (change->data.tuplecid.cmax > ent->cmax)));
1400  ent->cmax = change->data.tuplecid.cmax;
1401  }
1402  }
1403 }
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
Size entrysize
Definition: hsearch.h:73
struct ReorderBufferChange::@101::@105 tuplecid
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext context
#define InvalidCommandId
Definition: c.h:531
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
Size keysize
Definition: hsearch.h:72
dlist_node * cur
Definition: ilist.h:161
#define Assert(condition)
Definition: c.h:739
union ReorderBufferChange::@101 data
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
dlist_head tuplecids
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:161

◆ ReorderBufferChangeMemoryUpdate()

static void ReorderBufferChangeMemoryUpdate ( ReorderBuffer rb,
ReorderBufferChange change,
bool  addition 
)
static

Definition at line 2148 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferChangeSize(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBufferChange::txn.

Referenced by ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferReturnChange(), and ReorderBufferToastReplace().

2151 {
2152  Size sz;
2153 
2154  Assert(change->txn);
2155 
2156  /*
2157  * Ignore tuple CID changes, because those are not evicted when reaching
2158  * memory limit. So we just don't count them, because it might easily
2159  * trigger a pointless attempt to spill.
2160  */
2162  return;
2163 
2164  sz = ReorderBufferChangeSize(change);
2165 
2166  if (addition)
2167  {
2168  change->txn->size += sz;
2169  rb->size += sz;
2170  }
2171  else
2172  {
2173  Assert((rb->size >= sz) && (change->txn->size >= sz));
2174  change->txn->size -= sz;
2175  rb->size -= sz;
2176  }
2177 }
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:86
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
#define Assert(condition)
Definition: c.h:739
size_t Size
Definition: c.h:467

◆ ReorderBufferChangeSize()

static Size ReorderBufferChangeSize ( ReorderBufferChange change)
static

Definition at line 2689 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::msg, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::snapshot, SnapshotData::subxcnt, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTupleBuf::tuple, and SnapshotData::xcnt.

Referenced by ReorderBufferChangeMemoryUpdate().

2690 {
2691  Size sz = sizeof(ReorderBufferChange);
2692 
2693  switch (change->action)
2694  {
2695  /* fall through these, they're all similar enough */
2700  {
2701  ReorderBufferTupleBuf *oldtup,
2702  *newtup;
2703  Size oldlen = 0;
2704  Size newlen = 0;
2705 
2706  oldtup = change->data.tp.oldtuple;
2707  newtup = change->data.tp.newtuple;
2708 
2709  if (oldtup)
2710  {
2711  sz += sizeof(HeapTupleData);
2712  oldlen = oldtup->tuple.t_len;
2713  sz += oldlen;
2714  }
2715 
2716  if (newtup)
2717  {
2718  sz += sizeof(HeapTupleData);
2719  newlen = newtup->tuple.t_len;
2720  sz += newlen;
2721  }
2722 
2723  break;
2724  }
2726  {
2727  Size prefix_size = strlen(change->data.msg.prefix) + 1;
2728 
2729  sz += prefix_size + change->data.msg.message_size +
2730  sizeof(Size) + sizeof(Size);
2731 
2732  break;
2733  }
2735  {
2736  Snapshot snap;
2737 
2738  snap = change->data.snapshot;
2739 
2740  sz += sizeof(SnapshotData) +
2741  sizeof(TransactionId) * snap->xcnt +
2742  sizeof(TransactionId) * snap->subxcnt;
2743 
2744  break;
2745  }
2747  {
2748  sz += sizeof(Oid) * change->data.truncate.nrelids;
2749 
2750  break;
2751  }
2755  /* ReorderBufferChange contains everything important */
2756  break;
2757  }
2758 
2759  return sz;
2760 }
uint32 TransactionId
Definition: c.h:514
struct ReorderBufferChange::@101::@102 tp
struct ReorderBufferChange::@101::@104 msg
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
uint32 t_len
Definition: htup.h:64
HeapTupleData tuple
Definition: reorderbuffer.h:29
struct SnapshotData SnapshotData
union ReorderBufferChange::@101 data
size_t Size
Definition: c.h:467
uint32 xcnt
Definition: snapshot.h:169
struct HeapTupleData HeapTupleData
struct ReorderBufferChange::@101::@103 truncate
struct ReorderBufferChange ReorderBufferChange
int32 subxcnt
Definition: snapshot.h:181

◆ ReorderBufferCheckMemoryLimit()

static void ReorderBufferCheckMemoryLimit ( ReorderBuffer rb)
static

Definition at line 2375 of file reorderbuffer.c.

References Assert, logical_decoding_work_mem, ReorderBufferTXN::nentries_mem, ReorderBufferLargestTXN(), ReorderBufferSerializeTXN(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferQueueChange().

2376 {
2377  ReorderBufferTXN *txn;
2378 
2379  /* bail out if we haven't exceeded the memory limit */
2380  if (rb->size < logical_decoding_work_mem * 1024L)
2381  return;
2382 
2383  /*
2384  * Pick the largest transaction (or subtransaction) and evict it from
2385  * memory by serializing it to disk.
2386  */
2387  txn = ReorderBufferLargestTXN(rb);
2388 
2389  ReorderBufferSerializeTXN(rb, txn);
2390 
2391  /*
2392  * After eviction, the transaction should have no entries in memory, and
2393  * should use 0 bytes for changes.
2394  */
2395  Assert(txn->size == 0);
2396  Assert(txn->nentries_mem == 0);
2397 
2398  /*
2399  * And furthermore, evicting the transaction should get us below the
2400  * memory limit again - it is not possible that we're still exceeding the
2401  * memory limit after evicting the transaction.
2402  *
2403  * This follows from the simple fact that the selected transaction is at
2404  * least as large as the most recent change (which caused us to go over
2405  * the memory limit). So by evicting it we're definitely back below the
2406  * memory limit.
2407  */
2408  Assert(rb->size < logical_decoding_work_mem * 1024L);
2409 }
static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:739
int logical_decoding_work_mem

◆ ReorderBufferCleanupSerializedTXNs()

static void ReorderBufferCleanupSerializedTXNs ( const char *  slotname)
static

Definition at line 3081 of file reorderbuffer.c.

References AllocateDir(), dirent::d_name, ereport, errcode_for_file_access(), errmsg(), ERROR, FreeDir(), INFO, lstat, MAXPGPATH, ReadDirExtended(), S_ISDIR, snprintf, sprintf, and stat.

Referenced by ReorderBufferAllocate(), ReorderBufferFree(), and StartupReorderBuffer().

3082 {
3083  DIR *spill_dir;
3084  struct dirent *spill_de;
3085  struct stat statbuf;
3086  char path[MAXPGPATH * 2 + 12];
3087 
3088  sprintf(path, "pg_replslot/%s", slotname);
3089 
3090  /* we're only handling directories here, skip if it's not ours */
3091  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
3092  return;
3093 
3094  spill_dir = AllocateDir(path);
3095  while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
3096  {
3097  /* only look at names that can be ours */
3098  if (strncmp(spill_de->d_name, "xid", 3) == 0)
3099  {
3100  snprintf(path, sizeof(path),
3101  "pg_replslot/%s/%s", slotname,
3102  spill_de->d_name);
3103 
3104  if (unlink(path) != 0)
3105  ereport(ERROR,
3107  errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
3108  path, slotname)));
3109  }
3110  }
3111  FreeDir(spill_dir);
3112 }
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2584
#define INFO
Definition: elog.h:33
Definition: dirent.h:9
#define sprintf
Definition: port.h:194
Definition: dirent.c:25
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:631
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2503
#define ereport(elevel, rest)
Definition: elog.h:141
#define stat(a, b)
Definition: win32_port.h:255
#define S_ISDIR(m)
Definition: win32_port.h:296
#define lstat(path, sb)
Definition: win32_port.h:244
int errmsg(const char *fmt,...)
Definition: elog.c:822
char d_name[MAX_PATH]
Definition: dirent.h:14
#define snprintf
Definition: port.h:192
int FreeDir(DIR *dir)
Definition: fd.c:2621

◆ ReorderBufferCleanupTXN()

static void ReorderBufferCleanupTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1241 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_node, ReorderBuffer::by_txn, ReorderBufferTXN::changes, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, HASH_REMOVE, hash_search(), ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferRestoreCleanup(), ReorderBufferReturnChange(), ReorderBufferReturnTXN(), ReorderBufferTXN::serialized, SnapBuildSnapDecRefcount(), ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAbortOld(), ReorderBufferCommit(), and ReorderBufferForget().

1242 {
1243  bool found;
1244  dlist_mutable_iter iter;
1245 
1246  /* cleanup subtransactions & their changes */
1247  dlist_foreach_modify(iter, &txn->subtxns)
1248  {
1249  ReorderBufferTXN *subtxn;
1250 
1251  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1252 
1253  /*
1254  * Subtransactions are always associated to the toplevel TXN, even if
1255  * they originally were happening inside another subtxn, so we won't
1256  * ever recurse more than one level deep here.
1257  */
1258  Assert(subtxn->is_known_as_subxact);
1259  Assert(subtxn->nsubtxns == 0);
1260 
1261  ReorderBufferCleanupTXN(rb, subtxn);
1262  }
1263 
1264  /* cleanup changes in the toplevel txn */
1265  dlist_foreach_modify(iter, &txn->changes)
1266  {
1267  ReorderBufferChange *change;
1268 
1269  change = dlist_container(ReorderBufferChange, node, iter.cur);
1270 
1271  /* Check we're not mixing changes from different transactions. */
1272  Assert(change->txn == txn);
1273 
1274  ReorderBufferReturnChange(rb, change);
1275  }
1276 
1277  /*
1278  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1279  * They are always stored in the toplevel transaction.
1280  */
1281  dlist_foreach_modify(iter, &txn->tuplecids)
1282  {
1283  ReorderBufferChange *change;
1284 
1285  change = dlist_container(ReorderBufferChange, node, iter.cur);
1286 
1287  /* Check we're not mixing changes from different transactions. */
1288  Assert(change->txn == txn);
1290 
1291  ReorderBufferReturnChange(rb, change);
1292  }
1293 
1294  /*
1295  * Cleanup the base snapshot, if set.
1296  */
1297  if (txn->base_snapshot != NULL)
1298  {
1301  }
1302 
1303  /*
1304  * Remove TXN from its containing list.
1305  *
1306  * Note: if txn->is_known_as_subxact, we are deleting the TXN from its
1307  * parent's list of known subxacts; this leaves the parent's nsubxacts
1308  * count too high, but we don't care. Otherwise, we are deleting the TXN
1309  * from the LSN-ordered list of toplevel TXNs.
1310  */
1311  dlist_delete(&txn->node);
1312 
1313  /* now remove reference from buffer */
1314  hash_search(rb->by_txn,
1315  (void *) &txn->xid,
1316  HASH_REMOVE,
1317  &found);
1318  Assert(found);
1319 
1320  /* remove entries spilled to disk */
1321  if (txn->serialized)
1322  ReorderBufferRestoreCleanup(rb, txn);
1323 
1324  /* deallocate */
1325  ReorderBufferReturnTXN(rb, txn);
1326 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
Snapshot base_snapshot
dlist_node base_snapshot_node
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:86
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
#define Assert(condition)
Definition: c.h:739
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:428
dlist_head subtxns
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
dlist_head tuplecids

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 1492 of file reorderbuffer.c.

References AbortCurrentTransaction(), ReorderBufferChange::action, ReorderBuffer::apply_change, ReorderBuffer::apply_truncate, Assert, ReorderBufferTXN::base_snapshot, ReorderBuffer::begin, BeginInternalSubTransaction(), ReorderBufferChange::command_id, ReorderBuffer::commit, ReorderBufferTXN::commit_time, SnapshotData::copied, SnapshotData::curcid, ReorderBufferChange::data, dlist_delete(), elog, ReorderBufferTXN::end_lsn, ERROR, ReorderBufferTXN::final_lsn, FirstCommandId, GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), i, InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, ReorderBuffer::message, ReorderBufferChange::msg, ReorderBufferTXN::ninvalidations, ReorderBufferChange::node, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBuffer::output_rewrites, palloc0(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelationIsValid, RelidByRelfilenode(), relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferReturnChange(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTXNByXid(), RollbackAndReleaseCurrentSubTransaction(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, StartTransactionCommand(), TeardownHistoricSnapshot(), ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

1496 {
1497  ReorderBufferTXN *txn;
1498  volatile Snapshot snapshot_now;
1499  volatile CommandId command_id = FirstCommandId;
1500  bool using_subtxn;
1501  ReorderBufferIterTXNState *volatile iterstate = NULL;
1502 
1503  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1504  false);
1505 
1506  /* unknown transaction, nothing to replay */
1507  if (txn == NULL)
1508  return;
1509 
1510  txn->final_lsn = commit_lsn;
1511  txn->end_lsn = end_lsn;
1512  txn->commit_time = commit_time;
1513  txn->origin_id = origin_id;
1514  txn->origin_lsn = origin_lsn;
1515 
1516  /*
1517  * If this transaction has no snapshot, it didn't make any changes to the
1518  * database, so there's nothing to decode. Note that
1519  * ReorderBufferCommitChild will have transferred any snapshots from
1520  * subtransactions if there were any.
1521  */
1522  if (txn->base_snapshot == NULL)
1523  {
1524  Assert(txn->ninvalidations == 0);
1525  ReorderBufferCleanupTXN(rb, txn);
1526  return;
1527  }
1528 
1529  snapshot_now = txn->base_snapshot;
1530 
1531  /* build data to be able to lookup the CommandIds of catalog tuples */
1533 
1534  /* setup the initial snapshot */
1535  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1536 
1537  /*
1538  * Decoding needs access to syscaches et al., which in turn use
1539  * heavyweight locks and such. Thus we need to have enough state around to
1540  * keep track of those. The easiest way is to simply use a transaction
1541  * internally. That also allows us to easily enforce that nothing writes
1542  * to the database by checking for xid assignments.
1543  *
1544  * When we're called via the SQL SRF there's already a transaction
1545  * started, so start an explicit subtransaction there.
1546  */
1547  using_subtxn = IsTransactionOrTransactionBlock();
1548 
1549  PG_TRY();
1550  {
1551  ReorderBufferChange *change;
1552  ReorderBufferChange *specinsert = NULL;
1553 
1554  if (using_subtxn)
1555  BeginInternalSubTransaction("replay");
1556  else
1558 
1559  rb->begin(rb, txn);
1560 
1561  iterstate = ReorderBufferIterTXNInit(rb, txn);
1562  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1563  {
1564  Relation relation = NULL;
1565  Oid reloid;
1566 
1567  switch (change->action)
1568  {
1570 
1571  /*
1572  * Confirmation for speculative insertion arrived. Simply
1573  * use as a normal record. It'll be cleaned up at the end
1574  * of INSERT processing.
1575  */
1576  if (specinsert == NULL)
1577  elog(ERROR, "invalid ordering of speculative insertion changes");
1578  Assert(specinsert->data.tp.oldtuple == NULL);
1579  change = specinsert;
1581 
1582  /* intentionally fall through */
1586  Assert(snapshot_now);
1587 
1588  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1589  change->data.tp.relnode.relNode);
1590 
1591  /*
1592  * Mapped catalog tuple without data, emitted while
1593  * catalog table was in the process of being rewritten. We
1594  * can fail to look up the relfilenode, because the
1595  * relmapper has no "historic" view, in contrast to normal
1596  * the normal catalog during decoding. Thus repeated
1597  * rewrites can cause a lookup failure. That's OK because
1598  * we do not decode catalog changes anyway. Normally such
1599  * tuples would be skipped over below, but we can't
1600  * identify whether the table should be logically logged
1601  * without mapping the relfilenode to the oid.
1602  */
1603  if (reloid == InvalidOid &&
1604  change->data.tp.newtuple == NULL &&
1605  change->data.tp.oldtuple == NULL)
1606  goto change_done;
1607  else if (reloid == InvalidOid)
1608  elog(ERROR, "could not map filenode \"%s\" to relation OID",
1609  relpathperm(change->data.tp.relnode,
1610  MAIN_FORKNUM));
1611 
1612  relation = RelationIdGetRelation(reloid);
1613 
1614  if (!RelationIsValid(relation))
1615  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1616  reloid,
1617  relpathperm(change->data.tp.relnode,
1618  MAIN_FORKNUM));
1619 
1620  if (!RelationIsLogicallyLogged(relation))
1621  goto change_done;
1622 
1623  /*
1624  * Ignore temporary heaps created during DDL unless the
1625  * plugin has asked for them.
1626  */
1627  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
1628  goto change_done;
1629 
1630  /*
1631  * For now ignore sequence changes entirely. Most of the
1632  * time they don't log changes using records we
1633  * understand, so it doesn't make sense to handle the few
1634  * cases we do.
1635  */
1636  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1637  goto change_done;
1638 
1639  /* user-triggered change */
1640  if (!IsToastRelation(relation))
1641  {
1642  ReorderBufferToastReplace(rb, txn, relation, change);
1643  rb->apply_change(rb, txn, relation, change);
1644 
1645  /*
1646  * Only clear reassembled toast chunks if we're sure
1647  * they're not required anymore. The creator of the
1648  * tuple tells us.
1649  */
1650  if (change->data.tp.clear_toast_afterwards)
1651  ReorderBufferToastReset(rb, txn);
1652  }
1653  /* we're not interested in toast deletions */
1654  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1655  {
1656  /*
1657  * Need to reassemble the full toasted Datum in
1658  * memory, to ensure the chunks don't get reused till
1659  * we're done remove it from the list of this
1660  * transaction's changes. Otherwise it will get
1661  * freed/reused while restoring spooled data from
1662  * disk.
1663  */
1664  Assert(change->data.tp.newtuple != NULL);
1665 
1666  dlist_delete(&change->node);
1667  ReorderBufferToastAppendChunk(rb, txn, relation,
1668  change);
1669  }
1670 
1671  change_done:
1672 
1673  /*
1674  * Either speculative insertion was confirmed, or it was
1675  * unsuccessful and the record isn't needed anymore.
1676  */
1677  if (specinsert != NULL)
1678  {
1679  ReorderBufferReturnChange(rb, specinsert);
1680  specinsert = NULL;
1681  }
1682 
1683  if (relation != NULL)
1684  {
1685  RelationClose(relation);
1686  relation = NULL;
1687  }
1688  break;
1689 
1691 
1692  /*
1693  * Speculative insertions are dealt with by delaying the
1694  * processing of the insert until the confirmation record
1695  * arrives. For that we simply unlink the record from the
1696  * chain, so it does not get freed/reused while restoring
1697  * spooled data from disk.
1698  *
1699  * This is safe in the face of concurrent catalog changes
1700  * because the relevant relation can't be changed between
1701  * speculative insertion and confirmation due to
1702  * CheckTableNotInUse() and locking.
1703  */
1704 
1705  /* clear out a pending (and thus failed) speculation */
1706  if (specinsert != NULL)
1707  {
1708  ReorderBufferReturnChange(rb, specinsert);
1709  specinsert = NULL;
1710  }
1711 
1712  /* and memorize the pending insertion */
1713  dlist_delete(&change->node);
1714  specinsert = change;
1715  break;
1716 
1718  {
1719  int i;
1720  int nrelids = change->data.truncate.nrelids;
1721  int nrelations = 0;
1722  Relation *relations;
1723 
1724  relations = palloc0(nrelids * sizeof(Relation));
1725  for (i = 0; i < nrelids; i++)
1726  {
1727  Oid relid = change->data.truncate.relids[i];
1728  Relation relation;
1729 
1730  relation = RelationIdGetRelation(relid);
1731 
1732  if (!RelationIsValid(relation))
1733  elog(ERROR, "could not open relation with OID %u", relid);
1734 
1735  if (!RelationIsLogicallyLogged(relation))
1736  continue;
1737 
1738  relations[nrelations++] = relation;
1739  }
1740 
1741  rb->apply_truncate(rb, txn, nrelations, relations, change);
1742 
1743  for (i = 0; i < nrelations; i++)
1744  RelationClose(relations[i]);
1745 
1746  break;
1747  }
1748 
1750  rb->message(rb, txn, change->lsn, true,
1751  change->data.msg.prefix,
1752  change->data.msg.message_size,
1753  change->data.msg.message);
1754  break;
1755 
1757  /* get rid of the old */
1758  TeardownHistoricSnapshot(false);
1759 
1760  if (snapshot_now->copied)
1761  {
1762  ReorderBufferFreeSnap(rb, snapshot_now);
1763  snapshot_now =
1764  ReorderBufferCopySnap(rb, change->data.snapshot,
1765  txn, command_id);
1766  }
1767 
1768  /*
1769  * Restored from disk, need to be careful not to double
1770  * free. We could introduce refcounting for that, but for
1771  * now this seems infrequent enough not to care.
1772  */
1773  else if (change->data.snapshot->copied)
1774  {
1775  snapshot_now =
1776  ReorderBufferCopySnap(rb, change->data.snapshot,
1777  txn, command_id);
1778  }
1779  else
1780  {
1781  snapshot_now = change->data.snapshot;
1782  }
1783 
1784 
1785  /* and continue with the new one */
1786  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1787  break;
1788 
1790  Assert(change->data.command_id != InvalidCommandId);
1791 
1792  if (command_id < change->data.command_id)
1793  {
1794  command_id = change->data.command_id;
1795 
1796  if (!snapshot_now->copied)
1797  {
1798  /* we don't use the global one anymore */
1799  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1800  txn, command_id);
1801  }
1802 
1803  snapshot_now->curcid = command_id;
1804 
1805  TeardownHistoricSnapshot(false);
1806  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1807 
1808  /*
1809  * Every time the CommandId is incremented, we could
1810  * see new catalog contents, so execute all
1811  * invalidations.
1812  */
1814  }
1815 
1816  break;
1817 
1819  elog(ERROR, "tuplecid value in changequeue");
1820  break;
1821  }
1822  }
1823 
1824  /*
1825  * There's a speculative insertion remaining, just clean in up, it
1826  * can't have been successful, otherwise we'd gotten a confirmation
1827  * record.
1828  */
1829  if (specinsert)
1830  {
1831  ReorderBufferReturnChange(rb, specinsert);
1832  specinsert = NULL;
1833  }
1834 
1835  /* clean up the iterator */
1836  ReorderBufferIterTXNFinish(rb, iterstate);
1837  iterstate = NULL;
1838 
1839  /* call commit callback */
1840  rb->commit(rb, txn, commit_lsn);
1841 
1842  /* this is just a sanity check against bad output plugin behaviour */
1844  elog(ERROR, "output plugin used XID %u",
1846 
1847  /* cleanup */
1848  TeardownHistoricSnapshot(false);
1849 
1850  /*
1851  * Aborting the current (sub-)transaction as a whole has the right
1852  * semantics. We want all locks acquired in here to be released, not
1853  * reassigned to the parent and we do not want any database access
1854  * have persistent effects.
1855  */
1857 
1858  /* make sure there's no cache pollution */
1860 
1861  if (using_subtxn)
1863 
1864  if (snapshot_now->copied)
1865  ReorderBufferFreeSnap(rb, snapshot_now);
1866 
1867  /* remove potential on-disk data, and deallocate */
1868  ReorderBufferCleanupTXN(rb, txn);
1869  }
1870  PG_CATCH();
1871  {
1872  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1873  if (iterstate)
1874  ReorderBufferIterTXNFinish(rb, iterstate);
1875 
1877 
1878  /*
1879  * Force cache invalidation to happen outside of a valid transaction
1880  * to prevent catalog access as we just caught an error.
1881  */
1883 
1884  /* make sure there's no cache pollution */
1886 
1887  if (using_subtxn)
1889 
1890  if (snapshot_now->copied)
1891  ReorderBufferFreeSnap(rb, snapshot_now);
1892 
1893  /* remove potential on-disk data, and deallocate */
1894  ReorderBufferCleanupTXN(rb, txn);
1895 
1896  PG_RE_THROW();
1897  }
1898  PG_END_TRY();
1899 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
uint32 CommandId
Definition: c.h:528
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
TimestampTz commit_time
void AbortCurrentTransaction(void)
Definition: xact.c:3162
bool IsToastRelation(Relation relation)
Definition: catalog.c:141
#define relpathperm(rnode, forknum)
Definition: relpath.h:83
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
ReorderBufferApplyChangeCB apply_change
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
bool copied
Definition: snapshot.h:185
struct ReorderBufferChange::@101::@102 tp
struct ReorderBufferChange::@101::@104 msg
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4653
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2051
ReorderBufferCommitCB commit
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:594
Form_pg_class rd_rel
Definition: rel.h:83
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
XLogRecPtr origin_lsn
#define FirstCommandId
Definition: c.h:530
#define ERROR
Definition: elog.h:43
#define RelationIsValid(relation)
Definition: rel.h:395
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:422
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4464
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:439
#define InvalidTransactionId
Definition: transam.h:31
XLogRecPtr final_lsn
void RelationClose(Relation relation)
Definition: relcache.c:2088
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
ReorderBufferMessageCB message
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
void * palloc0(Size size)
Definition: mcxt.c:980
#define InvalidCommandId
Definition: c.h:531
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
#define InvalidOid
Definition: postgres_ext.h:36
CommandId curcid
Definition: snapshot.h:187
static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define PG_CATCH()
Definition: elog.h:332
#define Assert(condition)
Definition: c.h:739
union ReorderBufferChange::@101 data
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
XLogRecPtr end_lsn
void StartTransactionCommand(void)
Definition: xact.c:2797
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4359
#define PG_RE_THROW()
Definition: elog.h:363
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
ReorderBufferApplyTruncateCB apply_truncate
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2035
#define elog(elevel,...)
Definition: elog.h:228
int i
ReorderBufferBeginCB begin
#define PG_TRY()
Definition: elog.h:322
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
static ReorderBufferIterTXNState * ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:1989
#define PG_END_TRY()
Definition: elog.h:347
struct ReorderBufferChange::@101::@103 truncate

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 941 of file reorderbuffer.c.

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

944 {
945  ReorderBufferTXN *subtxn;
946 
947  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
948  InvalidXLogRecPtr, false);
949 
950  /*
951  * No need to do anything if that subtxn didn't contain any changes
952  */
953  if (!subtxn)
954  return;
955 
956  subtxn->final_lsn = commit_lsn;
957  subtxn->end_lsn = end_lsn;
958 
959  /*
960  * Assign this subxact as a child of the toplevel xact (no-op if already
961  * done.)
962  */
964 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr final_lsn
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
XLogRecPtr end_lsn
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferCopySnap()

static Snapshot ReorderBufferCopySnap ( ReorderBuffer rb,
Snapshot  orig_snap,
ReorderBufferTXN txn,
CommandId  cid 
)
static

Definition at line 1411 of file reorderbuffer.c.

References SnapshotData::active_count, ReorderBuffer::context, SnapshotData::copied, dlist_iter::cur, SnapshotData::curcid, dlist_container, dlist_foreach, i, MemoryContextAllocZero(), ReorderBufferTXN::nsubtxns, qsort, SnapshotData::regd_count, ReorderBufferTXN::subtxns, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::xcnt, ReorderBufferTXN::xid, xidComparator(), and SnapshotData::xip.

Referenced by ReorderBufferCommit().

1413 {
1414  Snapshot snap;
1415  dlist_iter iter;
1416  int i = 0;
1417  Size size;
1418 
1419  size = sizeof(SnapshotData) +
1420  sizeof(TransactionId) * orig_snap->xcnt +
1421  sizeof(TransactionId) * (txn->nsubtxns + 1);
1422 
1423  snap = MemoryContextAllocZero(rb->context, size);
1424  memcpy(snap, orig_snap, sizeof(SnapshotData));
1425 
1426  snap->copied = true;
1427  snap->active_count = 1; /* mark as active so nobody frees it */
1428  snap->regd_count = 0;
1429  snap->xip = (TransactionId *) (snap + 1);
1430 
1431  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1432 
1433  /*
1434  * snap->subxip contains all txids that belong to our transaction which we
1435  * need to check via cmin/cmax. That's why we store the toplevel
1436  * transaction in there as well.
1437  */
1438  snap->subxip = snap->xip + snap->xcnt;
1439  snap->subxip[i++] = txn->xid;
1440 
1441  /*
1442  * subxcnt isn't decreased when subtransactions abort, so count manually.
1443  * Since it's an upper boundary it is safe to use it for the allocation
1444  * above.
1445  */
1446  snap->subxcnt = 1;
1447 
1448  dlist_foreach(iter, &txn->subtxns)
1449  {
1450  ReorderBufferTXN *sub_txn;
1451 
1452  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1453  snap->subxip[i++] = sub_txn->xid;
1454  snap->subxcnt++;
1455  }
1456 
1457  /* sort so we can bsearch() later */
1458  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1459 
1460  /* store the specified current CommandId */
1461  snap->curcid = cid;
1462 
1463  return snap;
1464 }
uint32 TransactionId
Definition: c.h:514
bool copied
Definition: snapshot.h:185
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
uint32 regd_count
Definition: snapshot.h:199
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
struct SnapshotData SnapshotData
TransactionId * xip
Definition: snapshot.h:168
MemoryContext context
TransactionId xid
dlist_node * cur
Definition: ilist.h:161
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:839
CommandId curcid
Definition: snapshot.h:187
size_t Size
Definition: c.h:467
dlist_head subtxns
uint32 xcnt
Definition: snapshot.h:169
int i
#define qsort(a, b, c, d)
Definition: port.h:488
TransactionId * subxip
Definition: snapshot.h:180
uint32 active_count
Definition: snapshot.h:198
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:138
int32 subxcnt
Definition: snapshot.h:181

◆ ReorderBufferExecuteInvalidations()

static void ReorderBufferExecuteInvalidations ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2242 of file reorderbuffer.c.

References i, ReorderBufferTXN::invalidations, LocalExecuteInvalidationMessage(), and ReorderBufferTXN::ninvalidations.

Referenced by ReorderBufferCommit().

2243 {
2244  int i;
2245 
2246  for (i = 0; i < txn->ninvalidations; i++)
2248 }
SharedInvalidationMessage * invalidations
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:556
int i

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1998 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

1999 {
2000  ReorderBufferTXN *txn;
2001 
2002  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2003  false);
2004 
2005  /* unknown, nothing to forget */
2006  if (txn == NULL)
2007  return;
2008 
2009  /* cosmetic... */
2010  txn->final_lsn = lsn;
2011 
2012  /*
2013  * Process cache invalidation messages if there are any. Even if we're not
2014  * interested in the transaction's contents, it could have manipulated the
2015  * catalog and we need to update the caches according to that.
2016  */
2017  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2019  txn->invalidations);
2020  else
2021  Assert(txn->ninvalidations == 0);
2022 
2023  /* remove potential on-disk data, and deallocate */
2024  ReorderBufferCleanupTXN(rb, txn);
2025 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:739
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer rb)

Definition at line 334 of file reorderbuffer.c.

References ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

335 {
336  MemoryContext context = rb->context;
337 
338  /*
339  * We free separately allocated data by entirely scrapping reorderbuffer's
340  * memory context.
341  */
342  MemoryContextDelete(context);
343 
344  /* Free disk space used by unconsumed reorder buffers */
346 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
ReplicationSlotPersistentData data
Definition: slot.h:132
MemoryContext context
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NameStr(name)
Definition: c.h:616
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)

◆ ReorderBufferFreeSnap()

static void ReorderBufferFreeSnap ( ReorderBuffer rb,
Snapshot  snap 
)
static

Definition at line 1470 of file reorderbuffer.c.

References SnapshotData::copied, pfree(), and SnapBuildSnapDecRefcount().

Referenced by ReorderBufferCommit(), and ReorderBufferReturnChange().

1471 {
1472  if (snap->copied)
1473  pfree(snap);
1474  else
1476 }
bool copied
Definition: snapshot.h:185
void pfree(void *pointer)
Definition: mcxt.c:1056
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:428

◆ ReorderBufferGetChange()

ReorderBufferChange* ReorderBufferGetChange ( ReorderBuffer rb)

Definition at line 402 of file reorderbuffer.c.

References ReorderBuffer::change_context, and MemoryContextAlloc().

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddNewTupleCids(), ReorderBufferAddSnapshot(), ReorderBufferQueueMessage(), and ReorderBufferRestoreChange().

403 {
404  ReorderBufferChange *change;
405 
406  change = (ReorderBufferChange *)
408 
409  memset(change, 0, sizeof(ReorderBufferChange));
410  return change;
411 }
MemoryContext change_context
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer rb)

Definition at line 766 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessRunningXacts().

767 {
768  ReorderBufferTXN *txn;
769 
770  AssertTXNLsnOrder(rb);
771 
773  return NULL;
774 
776 
779  return txn;
780 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
dlist_head toplevel_by_lsn
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:739
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289

◆ ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBuffer rb)

Definition at line 794 of file reorderbuffer.c.

References AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, dlist_head_element, dlist_is_empty(), InvalidTransactionId, ReorderBufferTXNByIdEnt::txn, ReorderBuffer::txns_by_base_snapshot_lsn, and SnapshotData::xmin.

Referenced by SnapBuildProcessRunningXacts().

795 {
796  ReorderBufferTXN *txn;
797 
798  AssertTXNLsnOrder(rb);
799 
801  return InvalidTransactionId;
802 
803  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
805  return txn->base_snapshot->xmin;
806 }
Snapshot base_snapshot
dlist_head txns_by_base_snapshot_lsn
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: snapshot.h:157
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289

◆ ReorderBufferGetRelids()

Oid* ReorderBufferGetRelids ( ReorderBuffer rb,
int  nrelids 
)

Definition at line 514 of file reorderbuffer.c.

References ReorderBuffer::context, and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

515 {
516  Oid *relids;
517  Size alloc_len;
518 
519  alloc_len = sizeof(Oid) * nrelids;
520 
521  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
522 
523  return relids;
524 }
unsigned int Oid
Definition: postgres_ext.h:31
MemoryContext context
size_t Size
Definition: c.h:467
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796

◆ ReorderBufferGetTupleBuf()

ReorderBufferTupleBuf* ReorderBufferGetTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)

Definition at line 478 of file reorderbuffer.c.

References ReorderBufferTupleBuf::alloc_tuple_size, MemoryContextAlloc(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, HeapTupleData::t_data, ReorderBuffer::tup_context, and ReorderBufferTupleBuf::tuple.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

479 {
480  ReorderBufferTupleBuf *tuple;
481  Size alloc_len;
482 
483  alloc_len = tuple_len + SizeofHeapTupleHeader;
484 
485  tuple = (ReorderBufferTupleBuf *)
487  sizeof(ReorderBufferTupleBuf) +
488  MAXIMUM_ALIGNOF + alloc_len);
489  tuple->alloc_tuple_size = alloc_len;
490  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
491 
492  return tuple;
493 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
HeapTupleHeader t_data
Definition: htup.h:68
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:38
HeapTupleData tuple
Definition: reorderbuffer.h:29
size_t Size
Definition: c.h:467
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
MemoryContext tup_context

◆ ReorderBufferGetTXN()

static ReorderBufferTXN * ReorderBufferGetTXN ( ReorderBuffer rb)
static

Definition at line 352 of file reorderbuffer.c.

References ReorderBufferTXN::changes, dlist_init(), MemoryContextAlloc(), ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferTXNByIdEnt::txn, and ReorderBuffer::txn_context.

Referenced by ReorderBufferTXNByXid().

353 {
354  ReorderBufferTXN *txn;
355 
356  txn = (ReorderBufferTXN *)
358 
359  memset(txn, 0, sizeof(ReorderBufferTXN));
360 
361  dlist_init(&txn->changes);
362  dlist_init(&txn->tuplecids);
363  dlist_init(&txn->subtxns);
364 
365  return txn;
366 }
dlist_head changes
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
dlist_head subtxns
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
MemoryContext txn_context
dlist_head tuplecids

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 2034 of file reorderbuffer.c.

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by DecodeStandbyOp(), and ReorderBufferForget().

2036 {
2037  bool use_subtxn = IsTransactionOrTransactionBlock();
2038  int i;
2039 
2040  if (use_subtxn)
2041  BeginInternalSubTransaction("replay");
2042 
2043  /*
2044  * Force invalidations to happen outside of a valid transaction - that way
2045  * entries will just be marked as invalid without accessing the catalog.
2046  * That's advantageous because we don't need to setup the full state
2047  * necessary for catalog access.
2048  */
2049  if (use_subtxn)
2051 
2052  for (i = 0; i < ninvalidations; i++)
2053  LocalExecuteInvalidationMessage(&invalidations[i]);
2054 
2055  if (use_subtxn)
2057 }
void AbortCurrentTransaction(void)
Definition: xact.c:3162
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4653
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4464
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4359
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:556
int i

◆ ReorderBufferIterCompare()

static int ReorderBufferIterCompare ( Datum  a,
Datum  b,
void *  arg 
)
static

Definition at line 983 of file reorderbuffer.c.

References DatumGetInt32, ReorderBufferIterTXNState::entries, and ReorderBufferIterTXNEntry::lsn.

Referenced by ReorderBufferIterTXNInit().

984 {
986  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
987  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
988 
989  if (pos_a < pos_b)
990  return 1;
991  else if (pos_a == pos_b)
992  return 0;
993  return -1;
994 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
#define DatumGetInt32(X)
Definition: postgres.h:472
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:298
void * arg

◆ ReorderBufferIterTXNFinish()

static void ReorderBufferIterTXNFinish ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1210 of file reorderbuffer.c.

References Assert, binaryheap_free(), CloseTransientFile(), dlist_container, dlist_is_empty(), dlist_pop_head_node(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::fd, ReorderBufferIterTXNState::heap, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, pfree(), and ReorderBufferReturnChange().

Referenced by ReorderBufferCommit().

1212 {
1213  int32 off;
1214 
1215  for (off = 0; off < state->nr_txns; off++)
1216  {
1217  if (state->entries[off].fd != -1)
1218  CloseTransientFile(state->entries[off].fd);
1219  }
1220 
1221  /* free memory we might have "leaked" in the last *Next call */
1222  if (!dlist_is_empty(&state->old_change))
1223  {
1224  ReorderBufferChange *change;
1225 
1226  change = dlist_container(ReorderBufferChange, node,
1227  dlist_pop_head_node(&state->old_change));
1228  ReorderBufferReturnChange(rb, change);
1229  Assert(dlist_is_empty(&state->old_change));
1230  }
1231 
1232  binaryheap_free(state->heap);
1233  pfree(state);
1234 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
signed int int32
Definition: c.h:347
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:1056
int CloseTransientFile(int fd)
Definition: fd.c:2469
#define Assert(condition)
Definition: c.h:739
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:69
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368

◆ ReorderBufferIterTXNInit()

static ReorderBufferIterTXNState * ReorderBufferIterTXNInit ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1001 of file reorderbuffer.c.

References binaryheap_add_unordered(), binaryheap_allocate(), binaryheap_build(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, ReorderBuffer::context, dlist_iter::cur, dlist_container, dlist_foreach, dlist_head_element, dlist_init(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::fd, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, MemoryContextAllocZero(), ReorderBufferTXN::nentries, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, ReorderBufferIterCompare(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferIterTXNEntry::segno, ReorderBufferTXN::serialized, ReorderBufferTXN::subtxns, ReorderBufferTXNByIdEnt::txn, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferCommit().

1002 {
1003  Size nr_txns = 0;
1005  dlist_iter cur_txn_i;
1006  int32 off;
1007 
1008  /*
1009  * Calculate the size of our heap: one element for every transaction that
1010  * contains changes. (Besides the transactions already in the reorder
1011  * buffer, we count the one we were directly passed.)
1012  */
1013  if (txn->nentries > 0)
1014  nr_txns++;
1015 
1016  dlist_foreach(cur_txn_i, &txn->subtxns)
1017  {
1018  ReorderBufferTXN *cur_txn;
1019 
1020  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1021 
1022  if (cur_txn->nentries > 0)
1023  nr_txns++;
1024  }
1025 
1026  /*
1027  * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no
1028  * need to allocate/build a heap then.
1029  */
1030 
1031  /* allocate iteration state */
1032  state = (ReorderBufferIterTXNState *)
1034  sizeof(ReorderBufferIterTXNState) +
1035  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1036 
1037  state->nr_txns = nr_txns;
1038  dlist_init(&state->old_change);
1039 
1040  for (off = 0; off < state->nr_txns; off++)
1041  {
1042  state->entries[off].fd = -1;
1043  state->entries[off].segno = 0;
1044  }
1045 
1046  /* allocate heap */
1047  state->heap = binaryheap_allocate(state->nr_txns,
1049  state);
1050 
1051  /*
1052  * Now insert items into the binary heap, in an unordered fashion. (We
1053  * will run a heap assembly step at the end; this is more efficient.)
1054  */
1055 
1056  off = 0;
1057 
1058  /* add toplevel transaction if it contains changes */
1059  if (txn->nentries > 0)
1060  {
1061  ReorderBufferChange *cur_change;
1062 
1063  if (txn->serialized)
1064  {
1065  /* serialize remaining changes */
1066  ReorderBufferSerializeTXN(rb, txn);
1067  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
1068  &state->entries[off].segno);
1069  }
1070 
1071  cur_change = dlist_head_element(ReorderBufferChange, node,
1072  &txn->changes);
1073 
1074  state->entries[off].lsn = cur_change->lsn;
1075  state->entries[off].change = cur_change;
1076  state->entries[off].txn = txn;
1077 
1079  }
1080 
1081  /* add subtransactions if they contain changes */
1082  dlist_foreach(cur_txn_i, &txn->subtxns)
1083  {
1084  ReorderBufferTXN *cur_txn;
1085 
1086  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1087 
1088  if (cur_txn->nentries > 0)
1089  {
1090  ReorderBufferChange *cur_change;
1091 
1092  if (cur_txn->serialized)
1093  {
1094  /* serialize remaining changes */
1095  ReorderBufferSerializeTXN(rb, cur_txn);
1096  ReorderBufferRestoreChanges(rb, cur_txn,
1097  &state->entries[off].fd,
1098  &state->entries[off].segno);
1099  }
1100  cur_change = dlist_head_element(ReorderBufferChange, node,
1101  &cur_txn->changes);
1102 
1103  state->entries[off].lsn = cur_change->lsn;
1104  state->entries[off].change = cur_change;
1105  state->entries[off].txn = cur_txn;
1106 
1108  }
1109  }
1110 
1111  /* assemble a valid binary heap */
1112  binaryheap_build(state->heap);
1113 
1114  return state;
1115 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
ReorderBufferTXN * txn
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, int *fd, XLogSegNo *segno)
void binaryheap_add_unordered(binaryheap *heap, Datum d)
Definition: binaryheap.c:110
signed int int32
Definition: c.h:347
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
ReorderBufferChange * change
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
MemoryContext context
dlist_node * cur
Definition: ilist.h:161
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:839
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:126
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
Definition: regguts.h:298
size_t Size
Definition: c.h:467
dlist_head subtxns
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:33
#define Int32GetDatum(X)
Definition: postgres.h:479

◆ ReorderBufferIterTXNNext()

static ReorderBufferChange * ReorderBufferIterTXNNext ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1124 of file reorderbuffer.c.

References Assert, binaryheap::bh_size, binaryheap_first(), binaryheap_remove_first(), binaryheap_replace_first(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, DatumGetInt32, DEBUG2, dlist_container, dlist_delete(), dlist_has_next(), dlist_head_element, dlist_is_empty(), dlist_next_node(), dlist_pop_head_node(), dlist_push_tail(), elog, ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::fd, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, next, ReorderBufferChange::node, ReorderBufferIterTXNState::old_change, ReorderBufferRestoreChanges(), ReorderBufferReturnChange(), ReorderBufferIterTXNEntry::segno, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferCommit().

1125 {
1126  ReorderBufferChange *change;
1128  int32 off;
1129 
1130  /* nothing there anymore */
1131  if (state->heap->bh_size == 0)
1132  return NULL;
1133 
1134  off = DatumGetInt32(binaryheap_first(state->heap));
1135  entry = &state->entries[off];
1136 
1137  /* free memory we might have "leaked" in the previous *Next call */
1138  if (!dlist_is_empty(&state->old_change))
1139  {
1140  change = dlist_container(ReorderBufferChange, node,
1141  dlist_pop_head_node(&state->old_change));
1142  ReorderBufferReturnChange(rb, change);
1143  Assert(dlist_is_empty(&state->old_change));
1144  }
1145 
1146  change = entry->change;
1147 
1148  /*
1149  * update heap with information about which transaction has the next
1150  * relevant change in LSN order
1151  */
1152 
1153  /* there are in-memory changes */
1154  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1155  {
1156  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1157  ReorderBufferChange *next_change =
1158  dlist_container(ReorderBufferChange, node, next);
1159 
1160  /* txn stays the same */
1161  state->entries[off].lsn = next_change->lsn;
1162  state->entries[off].change = next_change;
1163 
1165  return change;
1166  }
1167 
1168  /* try to load changes from disk */
1169  if (entry->txn->nentries != entry->txn->nentries_mem)
1170  {
1171  /*
1172  * Ugly: restoring changes will reuse *Change records, thus delete the
1173  * current one from the per-tx list and only free in the next call.
1174  */
1175  dlist_delete(&change->node);
1176  dlist_push_tail(&state->old_change, &change->node);
1177 
1178  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
1179  &state->entries[off].segno))
1180  {
1181  /* successfully restored changes from disk */
1182  ReorderBufferChange *next_change =
1184  &entry->txn->changes);
1185 
1186  elog(DEBUG2, "restored %u/%u changes from disk",
1187  (uint32) entry->txn->nentries_mem,
1188  (uint32) entry->txn->nentries);
1189 
1190  Assert(entry->txn->nentries_mem);
1191  /* txn stays the same */
1192  state->entries[off].lsn = next_change->lsn;
1193  state->entries[off].change = next_change;
1195 
1196  return change;
1197  }
1198  }
1199 
1200  /* ok, no changes there anymore, remove */
1201  binaryheap_remove_first(state->heap);
1202 
1203  return change;
1204 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
static int32 next
Definition: blutils.c:213
#define DatumGetInt32(X)
Definition: postgres.h:472
ReorderBufferTXN * txn
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void binaryheap_replace_first(binaryheap *heap, Datum d)
Definition: binaryheap.c:204
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, int *fd, XLogSegNo *segno)
signed int int32
Definition: c.h:347
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:421
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
Datum binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:159
#define DEBUG2
Definition: elog.h:24
unsigned int uint32
Definition: c.h:359
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
int bh_size
Definition: binaryheap.h:32
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
ReorderBufferChange * change
static bool dlist_has_next(dlist_head *head, dlist_node *node)
Definition: ilist.h:402
#define Assert(condition)
Definition: c.h:739
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define Int32GetDatum(X)
Definition: postgres.h:479
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
#define elog(elevel,...)
Definition: elog.h:228
Datum binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:174

◆ ReorderBufferLargestTXN()

static ReorderBufferTXN* ReorderBufferLargestTXN ( ReorderBuffer rb)
static

Definition at line 2342 of file reorderbuffer.c.

References Assert, ReorderBuffer::by_txn, hash_seq_init(), hash_seq_search(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferCheckMemoryLimit().

2343 {
2344  HASH_SEQ_STATUS hash_seq;
2346  ReorderBufferTXN *largest = NULL;
2347 
2348  hash_seq_init(&hash_seq, rb->by_txn);
2349  while ((ent = hash_seq_search(&hash_seq)) != NULL)
2350  {
2351  ReorderBufferTXN *txn = ent->txn;
2352 
2353  /* if the current transaction is larger, remember it */
2354  if ((!largest) || (txn->size > largest->size))
2355  largest = txn;
2356  }
2357 
2358  Assert(largest);
2359  Assert(largest->size > 0);
2360  Assert(largest->size <= rb->size);
2361 
2362  return largest;
2363 }
#define Assert(condition)
Definition: c.h:739
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1389
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1379
ReorderBufferTXN * txn

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2070 of file reorderbuffer.c.

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by DecodeHeap2Op(), DecodeHeapOp(), DecodeLogicalMsgOp(), DecodeStandbyOp(), DecodeXactOp(), DecodeXLogOp(), and LogicalDecodingProcessRecord().

2071 {
2072  /* many records won't have an xid assigned, centralize check here */
2073  if (xid != InvalidTransactionId)
2074  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2075 }
#define InvalidTransactionId
Definition: transam.h:31
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change 
)

Definition at line 625 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBufferChangeMemoryUpdate(), ReorderBufferCheckMemoryLimit(), ReorderBufferTXNByXid(), ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

627 {
628  ReorderBufferTXN *txn;
629 
630  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
631 
632  change->lsn = lsn;
633  change->txn = txn;
634 
635  Assert(InvalidXLogRecPtr != lsn);
636  dlist_push_tail(&txn->changes, &change->node);
637  txn->nentries++;
638  txn->nentries_mem++;
639 
640  /* update memory accounting information */
641  ReorderBufferChangeMemoryUpdate(rb, change, true);
642 
643  /* check the memory limits and evict something if needed */
645 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:86
dlist_head changes
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:739
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snapshot,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 651 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), TeardownHistoricSnapshot(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeLogicalMsgOp().

655 {
656  if (transactional)
657  {
658  MemoryContext oldcontext;
659  ReorderBufferChange *change;
660 
662 
663  oldcontext = MemoryContextSwitchTo(rb->context);
664 
665  change = ReorderBufferGetChange(rb);
667  change->data.msg.prefix = pstrdup(prefix);
668  change->data.msg.message_size = message_size;
669  change->data.msg.message = palloc(message_size);
670  memcpy(change->data.msg.message, message, message_size);
671 
672  ReorderBufferQueueChange(rb, xid, lsn, change);
673 
674  MemoryContextSwitchTo(oldcontext);
675  }
676  else
677  {
678  ReorderBufferTXN *txn = NULL;
679  volatile Snapshot snapshot_now = snapshot;
680 
681  if (xid != InvalidTransactionId)
682  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
683 
684  /* setup snapshot to allow catalog access */
685  SetupHistoricSnapshot(snapshot_now, NULL);
686  PG_TRY();
687  {
688  rb->message(rb, txn, lsn, false, prefix, message_size, message);
689 
691  }
692  PG_CATCH();
693  {
695  PG_RE_THROW();
696  }
697  PG_END_TRY();
698  }
699 }
struct ReorderBufferChange::@101::@104 msg
char * pstrdup(const char *in)
Definition: mcxt.c:1186
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2051
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
#define InvalidTransactionId
Definition: transam.h:31
ReorderBufferMessageCB message
MemoryContext context
#define PG_CATCH()
Definition: elog.h:332
#define Assert(condition)
Definition: c.h:739
union ReorderBufferChange::@101 data
#define PG_RE_THROW()
Definition: elog.h:363
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:949
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2035
#define PG_TRY()
Definition: elog.h:322
#define PG_END_TRY()
Definition: elog.h:347

◆ ReorderBufferRestoreChange()

static void ReorderBufferRestoreChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
char *  change 
)
static

Definition at line 2898 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, ReorderBufferTXN::changes, ReorderBuffer::context, ReorderBufferChange::data, dlist_push_tail(), MemoryContextAlloc(), MemoryContextAllocZero(), ReorderBufferChange::msg, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, offsetof, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferGetChange(), ReorderBufferGetRelids(), ReorderBufferGetTupleBuf(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, ReorderBufferChange::snapshot, SnapshotData::subxcnt, ReorderBufferChange::tp, ReorderBufferChange::truncate, SnapshotData::xcnt, and SnapshotData::xip.

Referenced by ReorderBufferRestoreChanges().

2900 {
2901  ReorderBufferDiskChange *ondisk;
2902  ReorderBufferChange *change;
2903 
2904  ondisk = (ReorderBufferDiskChange *) data;
2905 
2906  change = ReorderBufferGetChange(rb);
2907 
2908  /* copy static part */
2909  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
2910 
2911  data += sizeof(ReorderBufferDiskChange);
2912 
2913  /* restore individual stuff */
2914  switch (change->action)
2915  {
2916  /* fall through these, they're all similar enough */
2921  if (change->data.tp.oldtuple)
2922  {
2923  uint32 tuplelen = ((HeapTuple) data)->t_len;
2924 
2925  change->data.tp.oldtuple =
2927 
2928  /* restore ->tuple */
2929  memcpy(&change->data.tp.oldtuple->tuple, data,
2930  sizeof(HeapTupleData));
2931  data += sizeof(HeapTupleData);
2932 
2933  /* reset t_data pointer into the new tuplebuf */
2934  change->data.tp.oldtuple->tuple.t_data =
2935  ReorderBufferTupleBufData(change->data.tp.oldtuple);
2936 
2937  /* restore tuple data itself */
2938  memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
2939  data += tuplelen;
2940  }
2941 
2942  if (change->data.tp.newtuple)
2943  {
2944  /* here, data might not be suitably aligned! */
2945  uint32 tuplelen;
2946 
2947  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
2948  sizeof(uint32));
2949 
2950  change->data.tp.newtuple =
2952 
2953  /* restore ->tuple */
2954  memcpy(&change->data.tp.newtuple->tuple, data,
2955  sizeof(HeapTupleData));
2956  data += sizeof(HeapTupleData);
2957 
2958  /* reset t_data pointer into the new tuplebuf */
2959  change->data.tp.newtuple->tuple.t_data =
2960  ReorderBufferTupleBufData(change->data.tp.newtuple);
2961 
2962  /* restore tuple data itself */
2963  memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
2964  data += tuplelen;
2965  }
2966 
2967  break;
2969  {
2970  Size prefix_size;
2971 
2972  /* read prefix */
2973  memcpy(&prefix_size, data, sizeof(Size));
2974  data += sizeof(Size);
2975  change->data.msg.prefix = MemoryContextAlloc(rb->context,
2976  prefix_size);
2977  memcpy(change->data.msg.prefix, data, prefix_size);
2978  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
2979  data += prefix_size;
2980 
2981  /* read the message */
2982  memcpy(&change->data.msg.message_size, data, sizeof(Size));
2983  data += sizeof(Size);
2984  change->data.msg.message = MemoryContextAlloc(rb->context,
2985  change->data.msg.message_size);
2986  memcpy(change->data.msg.message, data,
2987  change->data.msg.message_size);
2988  data += change->data.msg.message_size;
2989 
2990  break;
2991  }
2993  {
2994  Snapshot oldsnap;
2995  Snapshot newsnap;
2996  Size size;
2997 
2998  oldsnap = (Snapshot) data;
2999 
3000  size = sizeof(SnapshotData) +
3001  sizeof(TransactionId) * oldsnap->xcnt +
3002  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
3003 
3004  change->data.snapshot = MemoryContextAllocZero(rb->context, size);
3005 
3006  newsnap = change->data.snapshot;
3007 
3008  memcpy(newsnap, data, size);
3009  newsnap->xip = (TransactionId *)
3010  (((char *) newsnap) + sizeof(SnapshotData));
3011  newsnap->subxip = newsnap->xip + newsnap->xcnt;
3012  newsnap->copied = true;
3013  break;
3014  }
3015  /* the base struct contains all the data, easy peasy */
3017  {
3018  Oid *relids;
3019 
3020  relids = ReorderBufferGetRelids(rb,
3021  change->data.truncate.nrelids);
3022  memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
3023  change->data.truncate.relids = relids;
3024 
3025  break;
3026  }
3030  break;
3031  }
3032 
3033  dlist_push_tail(&txn->changes, &change->node);
3034  txn->nentries_mem++;
3035 
3036  /*
3037  * Update memory accounting for the restored change. We need to do this
3038  * although we don't check the memory limit when restoring the changes in
3039  * this branch (we only do that when initially queueing the changes after
3040  * decoding), because we will release the changes later, and that will
3041  * update the accounting too (subtracting the size from the counters). And
3042  * we don't want to underflow there.
3043  */
3044  ReorderBufferChangeMemoryUpdate(rb, change, true);
3045 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
HeapTupleData * HeapTuple
Definition: htup.h:71
uint32 TransactionId
Definition: c.h:514
struct ReorderBufferChange::@101::@102 tp
struct ReorderBufferChange::@101::@104 msg
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
struct SnapshotData * Snapshot
Definition: snapshot.h:121
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:38
dlist_head changes
struct SnapshotData SnapshotData
unsigned int uint32
Definition: c.h:359
TransactionId * xip
Definition: snapshot.h:168
MemoryContext context
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:839
struct ReorderBufferDiskChange ReorderBufferDiskChange
#define Assert(condition)
Definition: c.h:739
union ReorderBufferChange::@101 data
size_t Size
Definition: c.h:467
uint32 xcnt
Definition: snapshot.h:169
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
Oid * ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
ReorderBufferChange change
struct HeapTupleData HeapTupleData
#define offsetof(type, field)
Definition: c.h:662
struct ReorderBufferChange::@101::@103 truncate
int32 subxcnt
Definition: snapshot.h:181
ReorderBufferTupleBuf * ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)

◆ ReorderBufferRestoreChanges()

static Size ReorderBufferRestoreChanges ( ReorderBuffer rb,
ReorderBufferTXN txn,
int *  fd,
XLogSegNo segno 
)
static

Definition at line 2767 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, cleanup(), CloseTransientFile(), dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, max_changes_in_memory, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, OpenTransientFile(), ReorderBuffer::outbuf, PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferSerializedPath(), ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, WAIT_EVENT_REORDER_BUFFER_READ, wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferIterTXNInit(), and ReorderBufferIterTXNNext().

2769 {
2770  Size restored = 0;
2771  XLogSegNo last_segno;
2772  dlist_mutable_iter cleanup_iter;
2773 
2776 
2777  /* free current entries, so we have memory for more */
2778  dlist_foreach_modify(cleanup_iter, &txn->changes)
2779  {
2781  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
2782 
2783  dlist_delete(&cleanup->node);
2784  ReorderBufferReturnChange(rb, cleanup);
2785  }
2786  txn->nentries_mem = 0;
2787  Assert(dlist_is_empty(&txn->changes));
2788 
2789  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
2790 
2791  while (restored < max_changes_in_memory && *segno <= last_segno)
2792  {
2793  int readBytes;
2794  ReorderBufferDiskChange *ondisk;
2795 
2796  if (*fd == -1)
2797  {
2798  char path[MAXPGPATH];
2799 
2800  /* first time in */
2801  if (*segno == 0)
2802  XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
2803 
2804  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2805 
2806  /*
2807  * No need to care about TLIs here, only used during a single run,
2808  * so each LSN only maps to a specific WAL record.
2809  */
2811  *segno);
2812 
2813  *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
2814  if (*fd < 0 && errno == ENOENT)
2815  {
2816  *fd = -1;
2817  (*segno)++;
2818  continue;
2819  }
2820  else if (*fd < 0)
2821  ereport(ERROR,
2823  errmsg("could not open file \"%s\": %m",
2824  path)));
2825  }
2826 
2827  /*
2828  * Read the statically sized part of a change which has information
2829  * about the total size. If we couldn't read a record, we're at the
2830  * end of this file.
2831  */
2834  readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
2836 
2837  /* eof */
2838  if (readBytes == 0)
2839  {
2841  *fd = -1;
2842  (*segno)++;
2843  continue;
2844  }
2845  else if (readBytes < 0)
2846  ereport(ERROR,
2848  errmsg("could not read from reorderbuffer spill file: %m")));
2849  else if (readBytes != sizeof(ReorderBufferDiskChange))
2850  ereport(ERROR,
2852  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2853  readBytes,
2854  (uint32) sizeof(ReorderBufferDiskChange))));
2855 
2856  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2857 
2859  sizeof(ReorderBufferDiskChange) + ondisk->size);
2860  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2861 
2863  readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
2864  ondisk->size - sizeof(ReorderBufferDiskChange));
2866 
2867  if (readBytes < 0)
2868  ereport(ERROR,
2870  errmsg("could not read from reorderbuffer spill file: %m")));
2871  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
2872  ereport(ERROR,
2874  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2875  readBytes,
2876  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
2877 
2878  /*
2879  * ok, read a full change from disk, now restore it into proper
2880  * in-memory format
2881  */
2882  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
2883  restored++;
2884  }
2885 
2886  return restored;
2887 }
XLogRecPtr first_lsn
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
dlist_node * cur
Definition: ilist.h:180
int wal_segment_size
Definition: xlog.c:112
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1222
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2292
dlist_head changes
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:41
int errcode_for_file_access(void)
Definition: elog.c:631
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
unsigned int uint32
Definition: c.h:359
XLogRecPtr final_lsn
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1342
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define ereport(elevel, rest)
Definition: elog.h:141
int CloseTransientFile(int fd)
Definition: fd.c:2469
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
static void cleanup(void)
Definition: bootstrap.c:901
TransactionId xid
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:739
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
size_t Size
Definition: c.h:467
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1318
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
int errmsg(const char *fmt,...)
Definition: elog.c:822
static const Size max_changes_in_memory
#define read(a, b, c)
Definition: win32.h:13
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ ReorderBufferRestoreCleanup()

static void ReorderBufferRestoreCleanup ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 3051 of file reorderbuffer.c.

References Assert, cur, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, MAXPGPATH, MyReplicationSlot, ReorderBufferSerializedPath(), wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferCleanupTXN().

3052 {
3053  XLogSegNo first;
3054  XLogSegNo cur;
3055  XLogSegNo last;
3056 
3059 
3060  XLByteToSeg(txn->first_lsn, first, wal_segment_size);
3061  XLByteToSeg(txn->final_lsn, last, wal_segment_size);
3062 
3063  /* iterate over all possible filenames, and delete them */
3064  for (cur = first; cur <= last; cur++)
3065  {
3066  char path[MAXPGPATH];
3067 
3069  if (unlink(path) != 0 && errno != ENOENT)
3070  ereport(ERROR,
3072  errmsg("could not remove file \"%s\": %m", path)));
3073  }
3074 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int wal_segment_size
Definition: xlog.c:112
struct cursor * cur
Definition: ecpg.c:28
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:41
int errcode_for_file_access(void)
Definition: elog.c:631
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
XLogRecPtr final_lsn
#define ereport(elevel, rest)
Definition: elog.h:141
TransactionId xid
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:739
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ ReorderBufferReturnChange()

void ReorderBufferReturnChange ( ReorderBuffer rb,
ReorderBufferChange change 
)

Definition at line 417 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::msg, pfree(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferFreeSnap(), ReorderBufferReturnRelids(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, ReorderBufferChange::tp, and ReorderBufferChange::truncate.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferCommit(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), and ReorderBufferToastReset().

418 {
419  /* update memory accounting info */
420  ReorderBufferChangeMemoryUpdate(rb, change, false);
421 
422  /* free contained data */
423  switch (change->action)
424  {
429  if (change->data.tp.newtuple)
430  {
431  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
432  change->data.tp.newtuple = NULL;
433  }
434 
435  if (change->data.tp.oldtuple)
436  {
437  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
438  change->data.tp.oldtuple = NULL;
439  }
440  break;
442  if (change->data.msg.prefix != NULL)
443  pfree(change->data.msg.prefix);
444  change->data.msg.prefix = NULL;
445  if (change->data.msg.message != NULL)
446  pfree(change->data.msg.message);
447  change->data.msg.message = NULL;
448  break;
450  if (change->data.snapshot)
451  {
452  ReorderBufferFreeSnap(rb, change->data.snapshot);
453  change->data.snapshot = NULL;
454  }
455  break;
456  /* no data in addition to the struct itself */
458  if (change->data.truncate.relids != NULL)
459  {
460  ReorderBufferReturnRelids(rb, change->data.truncate.relids);
461  change->data.truncate.relids = NULL;
462  }
463  break;
467  break;
468  }
469 
470  pfree(change);
471 }
struct ReorderBufferChange::@101::@102 tp
struct ReorderBufferChange::@101::@104 msg
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
void pfree(void *pointer)
Definition: mcxt.c:1056
void ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
union ReorderBufferChange::@101 data
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
struct ReorderBufferChange::@101::@103 truncate

◆ ReorderBufferReturnRelids()

void ReorderBufferReturnRelids ( ReorderBuffer rb,
Oid relids 
)

Definition at line 530 of file reorderbuffer.c.

References pfree().

Referenced by ReorderBufferReturnChange().

531 {
532  pfree(relids);
533 }
void pfree(void *pointer)
Definition: mcxt.c:1056

◆ ReorderBufferReturnTupleBuf()

void ReorderBufferReturnTupleBuf ( ReorderBuffer rb,
ReorderBufferTupleBuf tuple 
)

Definition at line 499 of file reorderbuffer.c.

References pfree().

Referenced by ReorderBufferReturnChange().

500 {
501  pfree(tuple);
502 }
void pfree(void *pointer)
Definition: mcxt.c:1056

◆ ReorderBufferReturnTXN()

static void ReorderBufferReturnTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 372 of file reorderbuffer.c.

References ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, hash_destroy(), ReorderBufferTXN::invalidations, InvalidTransactionId, pfree(), ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::xid.

Referenced by ReorderBufferCleanupTXN().

373 {
374  /* clean the lookup cache if we were cached (quite likely) */
375  if (rb->by_txn_last_xid == txn->xid)
376  {
378  rb->by_txn_last_txn = NULL;
379  }
380 
381  /* free data that's contained */
382 
383  if (txn->tuplecid_hash != NULL)
384  {
386  txn->tuplecid_hash = NULL;
387  }
388 
389  if (txn->invalidations)
390  {
391  pfree(txn->invalidations);
392  txn->invalidations = NULL;
393  }
394 
395  pfree(txn);
396 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:814
TransactionId by_txn_last_xid
void pfree(void *pointer)
Definition: mcxt.c:1056
#define InvalidTransactionId
Definition: transam.h:31
ReorderBufferTXN * by_txn_last_txn
TransactionId xid
SharedInvalidationMessage * invalidations

◆ ReorderBufferSerializeChange()

static void ReorderBufferSerializeChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  fd,
ReorderBufferChange change 
)
static

Definition at line 2501 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, CloseTransientFile(), ReorderBufferChange::data, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferChange::msg, ReorderBuffer::outbuf, pgstat_report_wait_end(), pgstat_report_wait_start(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTupleBuf::tuple, WAIT_EVENT_REORDER_BUFFER_WRITE, write, SnapshotData::xcnt, ReorderBufferTXN::xid, and SnapshotData::xip.

Referenced by ReorderBufferSerializeTXN().

2503 {
2504  ReorderBufferDiskChange *ondisk;
2505  Size sz = sizeof(ReorderBufferDiskChange);
2506 
2508 
2509  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2510  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
2511 
2512  switch (change->action)
2513  {
2514  /* fall through these, they're all similar enough */
2519  {
2520  char *data;
2521  ReorderBufferTupleBuf *oldtup,
2522  *newtup;
2523  Size oldlen = 0;
2524  Size newlen = 0;
2525 
2526  oldtup = change->data.tp.oldtuple;
2527  newtup = change->data.tp.newtuple;
2528 
2529  if (oldtup)
2530  {
2531  sz += sizeof(HeapTupleData);
2532  oldlen = oldtup->tuple.t_len;
2533  sz += oldlen;
2534  }
2535 
2536  if (newtup)
2537  {
2538  sz += sizeof(HeapTupleData);
2539  newlen = newtup->tuple.t_len;
2540  sz += newlen;
2541  }
2542 
2543  /* make sure we have enough space */
2545 
2546  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2547  /* might have been reallocated above */
2548  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2549 
2550  if (oldlen)
2551  {
2552  memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
2553  data += sizeof(HeapTupleData);
2554 
2555  memcpy(data, oldtup->tuple.t_data, oldlen);
2556  data += oldlen;
2557  }
2558 
2559  if (newlen)
2560  {
2561  memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
2562  data += sizeof(HeapTupleData);
2563 
2564  memcpy(data, newtup->tuple.t_data, newlen);
2565  data += newlen;
2566  }
2567  break;
2568  }
2570  {
2571  char *data;
2572  Size prefix_size = strlen(change->data.msg.prefix) + 1;
2573 
2574  sz += prefix_size + change->data.msg.message_size +
2575  sizeof(Size) + sizeof(Size);
2577 
2578  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2579 
2580  /* might have been reallocated above */
2581  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2582 
2583  /* write the prefix including the size */
2584  memcpy(data, &prefix_size, sizeof(Size));
2585  data += sizeof(Size);
2586  memcpy(data, change->data.msg.prefix,
2587  prefix_size);
2588  data += prefix_size;
2589 
2590  /* write the message including the size */
2591  memcpy(data, &change->data.msg.message_size, sizeof(Size));
2592  data += sizeof(Size);
2593  memcpy(data, change->data.msg.message,
2594  change->data.msg.message_size);
2595  data += change->data.msg.message_size;
2596 
2597  break;
2598  }
2600  {
2601  Snapshot snap;
2602  char *data;
2603 
2604  snap = change->data.snapshot;
2605 
2606  sz += sizeof(SnapshotData) +
2607  sizeof(TransactionId) * snap->xcnt +
2608  sizeof(TransactionId) * snap->subxcnt
2609  ;
2610 
2611  /* make sure we have enough space */
2613  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2614  /* might have been reallocated above */
2615  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2616 
2617  memcpy(data, snap, sizeof(SnapshotData));
2618  data += sizeof(SnapshotData);
2619 
2620  if (snap->xcnt)
2621  {
2622  memcpy(data, snap->xip,
2623  sizeof(TransactionId) * snap->xcnt);
2624  data += sizeof(TransactionId) * snap->xcnt;
2625  }
2626 
2627  if (snap->subxcnt)
2628  {
2629  memcpy(data, snap->subxip,
2630  sizeof(TransactionId) * snap->subxcnt);
2631  data += sizeof(TransactionId) * snap->subxcnt;
2632  }
2633  break;
2634  }
2636  {
2637  Size size;
2638  char *data;
2639 
2640  /* account for the OIDs of truncated relations */
2641  size = sizeof(Oid) * change->data.truncate.nrelids;
2642  sz += size;
2643 
2644  /* make sure we have enough space */
2646 
2647  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2648  /* might have been reallocated above */
2649  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2650 
2651  memcpy(data, change->data.truncate.relids, size);
2652  data += size;
2653 
2654  break;
2655  }
2659  /* ReorderBufferChange contains everything important */
2660  break;
2661  }
2662 
2663  ondisk->size = sz;
2664 
2665  errno = 0;
2667  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
2668  {
2669  int save_errno = errno;
2670 
2672 
2673  /* if write didn't set errno, assume problem is no disk space */
2674  errno = save_errno ? save_errno : ENOSPC;
2675  ereport(ERROR,
2677  errmsg("could not write to data file for XID %u: %m",
2678  txn->xid)));
2679  }
2681 
2682  Assert(ondisk->change.action == change->action);
2683 }
uint32 TransactionId
Definition: c.h:514
struct ReorderBufferChange::@101::@102 tp
#define write(a, b, c)
Definition: win32.h:14
struct ReorderBufferChange::@101::@104 msg
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
static int fd(const char *x, int i)
Definition: preproc-init.c:105
HeapTupleHeader t_data
Definition: htup.h:68
#define ERROR
Definition: elog.h:43
uint32 t_len
Definition: htup.h:64
int errcode_for_file_access(void)
Definition: elog.c:631
HeapTupleData tuple
Definition: reorderbuffer.h:29
struct SnapshotData SnapshotData
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1342
#define ereport(elevel, rest)
Definition: elog.h:141
TransactionId * xip
Definition: snapshot.h:168
int CloseTransientFile(int fd)
Definition: fd.c:2469
TransactionId xid
struct ReorderBufferDiskChange ReorderBufferDiskChange
#define Assert(condition)
Definition: c.h:739
union ReorderBufferChange::@101 data
size_t Size
Definition: c.h:467
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1318
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
uint32 xcnt
Definition: snapshot.h:169
int errmsg(const char *fmt,...)
Definition: elog.c:822
ReorderBufferChange change
struct HeapTupleData HeapTupleData
TransactionId * subxip
Definition: snapshot.h:180
struct ReorderBufferChange::@101::@103 truncate
int32 subxcnt
Definition: snapshot.h:181

◆ ReorderBufferSerializedPath()

static void ReorderBufferSerializedPath ( char *  path,
ReplicationSlot slot,
TransactionId  xid,
XLogSegNo  segno 
)
static

Definition at line 3120 of file reorderbuffer.c.

References ReplicationSlot::data, MAXPGPATH, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, snprintf, wal_segment_size, and XLogSegNoOffsetToRecPtr.

Referenced by ReorderBufferRestoreChanges(), ReorderBufferRestoreCleanup(), and ReorderBufferSerializeTXN().

3122 {
3123  XLogRecPtr recptr;
3124 
3125  XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
3126 
3127  snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.spill",
3129  xid,
3130  (uint32) (recptr >> 32), (uint32) recptr);
3131 }
int wal_segment_size
Definition: xlog.c:112
ReplicationSlotPersistentData data
Definition: slot.h:132
#define MAXPGPATH
unsigned int uint32
Definition: c.h:359
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define NameStr(name)
Definition: c.h:616
#define snprintf
Definition: port.h:192

◆ ReorderBufferSerializeReserve()

static void ReorderBufferSerializeReserve ( ReorderBuffer rb,
Size  sz 
)
static

Definition at line 2316 of file reorderbuffer.c.

References ReorderBuffer::context, MemoryContextAlloc(), ReorderBuffer::outbuf, ReorderBuffer::outbufsize, and repalloc().

Referenced by ReorderBufferRestoreChanges(), and ReorderBufferSerializeChange().

2317 {
2318  if (!rb->outbufsize)
2319  {
2320  rb->outbuf = MemoryContextAlloc(rb->context, sz);
2321  rb->outbufsize = sz;
2322  }
2323  else if (rb->outbufsize < sz)
2324  {
2325  rb->outbuf = repalloc(rb->outbuf, sz);
2326  rb->outbufsize = sz;
2327  }
2328 }
MemoryContext context
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1069
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796

◆ ReorderBufferSerializeTXN()

static void ReorderBufferSerializeTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2415 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, CloseTransientFile(), dlist_iter::cur, dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_delete(), dlist_foreach, dlist_foreach_modify, dlist_is_empty(), elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferChange::lsn, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, OpenTransientFile(), PG_BINARY, ReorderBufferReturnChange(), ReorderBufferSerializeChange(), ReorderBufferSerializedPath(), ReorderBufferTXN::serialized, ReorderBufferTXN::size, ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBufferTXN::subtxns, wal_segment_size, ReorderBufferTXN::xid, XLByteInSeg, and XLByteToSeg.

Referenced by ReorderBufferCheckMemoryLimit(), and ReorderBufferIterTXNInit().

2416 {
2417  dlist_iter subtxn_i;
2418  dlist_mutable_iter change_i;
2419  int fd = -1;
2420  XLogSegNo curOpenSegNo = 0;
2421  Size spilled = 0;
2422  Size size = txn->size;
2423 
2424  elog(DEBUG2, "spill %u changes in XID %u to disk",
2425  (uint32) txn->nentries_mem, txn->xid);
2426 
2427  /* do the same to all child TXs */
2428  dlist_foreach(subtxn_i, &txn->subtxns)
2429  {
2430  ReorderBufferTXN *subtxn;
2431 
2432  subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
2433  ReorderBufferSerializeTXN(rb, subtxn);
2434  }
2435 
2436  /* serialize changestream */
2437  dlist_foreach_modify(change_i, &txn->changes)
2438  {
2439  ReorderBufferChange *change;
2440 
2441  change = dlist_container(ReorderBufferChange, node, change_i.cur);
2442 
2443  /*
2444  * store in segment in which it belongs by start lsn, don't split over
2445  * multiple segments tho
2446  */
2447  if (fd == -1 ||
2448  !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
2449  {
2450  char path[MAXPGPATH];
2451 
2452  if (fd != -1)
2453  CloseTransientFile(fd);
2454 
2455  XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
2456 
2457  /*
2458  * No need to care about TLIs here, only used during a single run,
2459  * so each LSN only maps to a specific WAL record.
2460  */
2462  curOpenSegNo);
2463 
2464  /* open segment, create it if necessary */
2465  fd = OpenTransientFile(path,
2466  O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
2467 
2468  if (fd < 0)
2469  ereport(ERROR,
2471  errmsg("could not open file \"%s\": %m", path)));
2472  }
2473 
2474  ReorderBufferSerializeChange(rb, txn, fd, change);
2475  dlist_delete(&change->node);
2476  ReorderBufferReturnChange(rb, change);
2477 
2478  spilled++;
2479  }
2480 
2481  /* update the statistics */
2482  rb->spillCount += 1;
2483  rb->spillBytes += size;
2484 
2485  /* Don't consider already serialized transaction. */
2486  rb->spillTxns += txn->serialized ? 0 : 1;
2487 
2488  Assert(spilled == txn->nentries_mem);
2489  Assert(dlist_is_empty(&txn->changes));
2490  txn->nentries_mem = 0;
2491  txn->serialized = true;
2492 
2493  if (fd != -1)
2494  CloseTransientFile(fd);
2495 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
dlist_node * cur
Definition: ilist.h:180
int wal_segment_size
Definition: xlog.c:112
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1222
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2292
dlist_head changes
#define MAXPGPATH
#define DEBUG2
Definition: elog.h:24
uint64 XLogSegNo
Definition: xlogdefs.h:41
int errcode_for_file_access(void)
Definition: elog.c:631
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
unsigned int uint32
Definition: c.h:359
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define ereport(elevel, rest)
Definition: elog.h:141
int CloseTransientFile(int fd)
Definition: fd.c:2469
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
dlist_node * cur
Definition: ilist.h:161
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:739
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
size_t Size
Definition: c.h:467
dlist_head subtxns
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define elog(elevel,...)
Definition: elog.h:228
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ ReorderBufferSetBaseSnapshot()

void ReorderBufferSetBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 2101 of file reorderbuffer.c.

References Assert, AssertArg, AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBufferTXN::base_snapshot_node, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXNByXid(), ReorderBufferTXN::toplevel_xid, ReorderBufferTXNByIdEnt::txn, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by SnapBuildCommitTxn(), and SnapBuildProcessChange().

2103 {
2104  ReorderBufferTXN *txn;
2105  bool is_new;
2106 
2107  AssertArg(snap != NULL);
2108 
2109  /*
2110  * Fetch the transaction to operate on. If we know it's a subtransaction,
2111  * operate on its top-level transaction instead.
2112  */
2113  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
2114  if (txn->is_known_as_subxact)
2115  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
2116  NULL, InvalidXLogRecPtr, false);
2117  Assert(txn->base_snapshot == NULL);
2118 
2119  txn->base_snapshot = snap;
2120  txn->base_snapshot_lsn = lsn;
2122 
2123  AssertTXNLsnOrder(rb);
2124 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
dlist_node base_snapshot_node
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
XLogRecPtr base_snapshot_lsn
dlist_head txns_by_base_snapshot_lsn
#define AssertArg(condition)
Definition: c.h:741
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:739
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
TransactionId toplevel_xid

◆ ReorderBufferSetRestartPoint()

void ReorderBufferSetRestartPoint ( ReorderBuffer rb,
XLogRecPtr  ptr 
)

Definition at line 809 of file reorderbuffer.c.

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

810 {
812 }
XLogRecPtr current_restart_decoding_lsn

◆ ReorderBufferToastAppendChunk()

static void ReorderBufferToastAppendChunk ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 3193 of file reorderbuffer.c.

References Assert, ReorderBufferToastEnt::chunk_id, ReorderBufferToastEnt::chunks, ReorderBufferChange::data, DatumGetInt32, DatumGetObjectId, DatumGetPointer, dlist_init(), dlist_push_tail(), elog, ERROR, fastgetattr, HASH_ENTER, hash_search(), IsToastRelation(), ReorderBufferToastEnt::last_chunk_seq, ReorderBufferChange::node, ReorderBufferToastEnt::num_chunks, ReorderBufferToastEnt::reconstructed, RelationGetDescr, ReorderBufferToastInitHash(), ReorderBufferToastEnt::size, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, ReorderBufferTupleBuf::tuple, VARATT_IS_EXTENDED, VARATT_IS_SHORT, VARHDRSZ, VARHDRSZ_SHORT, VARSIZE, and VARSIZE_SHORT.

Referenced by ReorderBufferCommit().

3195 {
3196  ReorderBufferToastEnt *ent;
3197  ReorderBufferTupleBuf *newtup;
3198  bool found;
3199  int32 chunksize;
3200  bool isnull;
3201  Pointer chunk;
3202  TupleDesc desc = RelationGetDescr(relation);
3203  Oid chunk_id;
3204  int32 chunk_seq;
3205 
3206  if (txn->toast_hash == NULL)
3207  ReorderBufferToastInitHash(rb, txn);
3208 
3209  Assert(IsToastRelation(relation));
3210 
3211  newtup = change->data.tp.newtuple;
3212  chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
3213  Assert(!isnull);
3214  chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
3215  Assert(!isnull);
3216 
3217  ent = (ReorderBufferToastEnt *)
3218  hash_search(txn->toast_hash,
3219  (void *) &chunk_id,
3220  HASH_ENTER,
3221  &found);
3222 
3223  if (!found)
3224  {
3225  Assert(ent->chunk_id == chunk_id);
3226  ent->num_chunks = 0;
3227  ent->last_chunk_seq = 0;
3228  ent->size = 0;
3229  ent->reconstructed = NULL;
3230  dlist_init(&ent->chunks);
3231 
3232  if (chunk_seq != 0)
3233  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
3234  chunk_seq, chunk_id);
3235  }
3236  else if (found && chunk_seq != ent->last_chunk_seq + 1)
3237  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
3238  chunk_seq, chunk_id, ent->last_chunk_seq + 1);
3239 
3240  chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
3241  Assert(!isnull);
3242 
3243  /* calculate size so we can allocate the right size at once later */
3244  if (!VARATT_IS_EXTENDED(chunk))
3245  chunksize = VARSIZE(chunk) - VARHDRSZ;
3246  else if (VARATT_IS_SHORT(chunk))
3247  /* could happen due to heap_form_tuple doing its thing */
3248  chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
3249  else
3250  elog(ERROR, "unexpected type of toast chunk");
3251 
3252  ent->size += chunksize;
3253  ent->last_chunk_seq = chunk_seq;
3254  ent->num_chunks++;
3255  dlist_push_tail(&ent->chunks, &change->node);
3256 }
bool IsToastRelation(Relation relation)
Definition: catalog.c:141
#define fastgetattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:712
#define DatumGetInt32(X)
Definition: postgres.h:472
#define RelationGetDescr(relation)
Definition: rel.h:448
struct ReorderBufferChange::@101::@102 tp
#define VARHDRSZ_SHORT
Definition: postgres.h:268
#define VARSIZE(PTR)
Definition: postgres.h:303
#define VARHDRSZ
Definition: c.h:562
#define DatumGetObjectId(X)
Definition: postgres.h:500
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
unsigned int Oid
Definition: postgres_ext.h:31
signed int int32
Definition: c.h:347
char * Pointer
Definition: c.h:336
#define ERROR
Definition: elog.h:43
#define VARATT_IS_SHORT(PTR)
Definition: postgres.h:326
struct varlena * reconstructed
HeapTupleData tuple
Definition: reorderbuffer.h:29
#define VARSIZE_SHORT(PTR)
Definition: postgres.h:305
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
#define Assert(condition)
Definition: c.h:739
union ReorderBufferChange::@101 data
#define VARATT_IS_EXTENDED(PTR)
Definition: postgres.h:327
#define DatumGetPointer(X)
Definition: postgres.h:549
#define elog(elevel,...)
Definition: elog.h:228
static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)

◆ ReorderBufferToastInitHash()

static void ReorderBufferToastInitHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 3172 of file reorderbuffer.c.

References Assert, ReorderBuffer::context, HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, and ReorderBufferTXN::toast_hash.

Referenced by ReorderBufferToastAppendChunk().

3173 {
3174  HASHCTL hash_ctl;
3175 
3176  Assert(txn->toast_hash == NULL);
3177 
3178  memset(&hash_ctl, 0, sizeof(hash_ctl));
3179  hash_ctl.keysize = sizeof(Oid);
3180  hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
3181  hash_ctl.hcxt = rb->context;
3182  txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
3184 }
struct ReorderBufferToastEnt ReorderBufferToastEnt
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
Size entrysize
Definition: hsearch.h:73
unsigned int Oid
Definition: postgres_ext.h:31
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext context
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
Size keysize
Definition: hsearch.h:72
#define Assert(condition)
Definition: c.h:739

◆ ReorderBufferToastReplace()

static void ReorderBufferToastReplace ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 3279 of file reorderbuffer.c.

References Assert, ReorderBufferToastEnt::chunks, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, DatumGetPointer, dlist_container, dlist_foreach, elog, ERROR, fastgetattr, free, HASH_FIND, hash_search(), heap_deform_tuple(), heap_form_tuple(), INDIRECT_POINTER_SIZE, MaxHeapTupleSize, MemoryContextSwitchTo(), TupleDescData::natts, palloc0(), pfree(), varatt_indirect::pointer, PointerGetDatum, RelationData::rd_rel, ReorderBufferToastEnt::reconstructed, RelationClose(), RelationGetDescr, RelationIdGetRelation(), RelationIsValid, ReorderBufferChangeMemoryUpdate(), ReorderBufferTupleBufData, SET_VARSIZE, SET_VARSIZE_COMPRESSED, SET_VARTAG_EXTERNAL, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, ReorderBufferTupleBuf::tuple, TupleDescAttr, varatt_external::va_extsize, varatt_external::va_rawsize, varatt_external::va_valueid, VARATT_EXTERNAL_GET_POINTER, VARATT_EXTERNAL_IS_COMPRESSED, VARATT_IS_EXTERNAL, VARATT_IS_SHORT, VARDATA, VARDATA_EXTERNAL, VARHDRSZ, VARSIZE, and VARTAG_INDIRECT.

Referenced by ReorderBufferCommit().

3281 {
3282  TupleDesc desc;
3283  int natt;
3284  Datum *attrs;
3285  bool *isnull;
3286  bool *free;
3287  HeapTuple tmphtup;
3288  Relation toast_rel;
3289  TupleDesc toast_desc;
3290  MemoryContext oldcontext;
3291  ReorderBufferTupleBuf *newtup;
3292 
3293  /* no toast tuples changed */
3294  if (txn->toast_hash == NULL)
3295  return;
3296 
3297  /*
3298  * We're going to modify the size of the change, so to make sure the
3299  * accounting is correct we'll make it look like we're removing the change
3300  * now (with the old size), and then re-add it at the end.
3301  */
3302  ReorderBufferChangeMemoryUpdate(rb, change, false);
3303 
3304  oldcontext = MemoryContextSwitchTo(rb->context);
3305 
3306  /* we should only have toast tuples in an INSERT or UPDATE */
3307  Assert(change->data.tp.newtuple);
3308 
3309  desc = RelationGetDescr(relation);
3310 
3311  toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
3312  if (!RelationIsValid(toast_rel))
3313  elog(ERROR, "could not open relation with OID %u",
3314  relation->rd_rel->reltoastrelid);
3315 
3316  toast_desc = RelationGetDescr(toast_rel);
3317 
3318  /* should we allocate from stack instead? */
3319  attrs = palloc0(sizeof(Datum) * desc->natts);
3320  isnull = palloc0(sizeof(bool) * desc->natts);
3321  free = palloc0(sizeof(bool) * desc->natts);
3322 
3323  newtup = change->data.tp.newtuple;
3324 
3325  heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
3326 
3327  for (natt = 0; natt < desc->natts; natt++)
3328  {
3329  Form_pg_attribute attr = TupleDescAttr(desc, natt);
3330  ReorderBufferToastEnt *ent;
3331  struct varlena *varlena;
3332 
3333  /* va_rawsize is the size of the original datum -- including header */
3334  struct varatt_external toast_pointer;
3335  struct varatt_indirect redirect_pointer;
3336  struct varlena *new_datum = NULL;
3337  struct varlena *reconstructed;
3338  dlist_iter it;
3339  Size data_done = 0;
3340 
3341  /* system columns aren't toasted */
3342  if (attr->attnum < 0)
3343  continue;
3344 
3345  if (attr->attisdropped)
3346  continue;
3347 
3348  /* not a varlena datatype */
3349  if (attr->attlen != -1)
3350  continue;
3351 
3352  /* no data */
3353  if (isnull[natt])
3354  continue;
3355 
3356  /* ok, we know we have a toast datum */
3357  varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
3358 
3359  /* no need to do anything if the tuple isn't external */
3360  if (!VARATT_IS_EXTERNAL(varlena))
3361  continue;
3362 
3363  VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
3364 
3365  /*
3366  * Check whether the toast tuple changed, replace if so.
3367  */
3368  ent = (ReorderBufferToastEnt *)
3369  hash_search(txn->toast_hash,
3370  (void *) &toast_pointer.va_valueid,
3371  HASH_FIND,
3372  NULL);
3373  if (ent == NULL)
3374  continue;
3375 
3376  new_datum =
3377  (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
3378 
3379  free[natt] = true;
3380 
3381  reconstructed = palloc0(toast_pointer.va_rawsize);
3382 
3383  ent->reconstructed = reconstructed;
3384 
3385  /* stitch toast tuple back together from its parts */
3386  dlist_foreach(it, &ent->chunks)
3387  {
3388  bool isnull;
3389  ReorderBufferChange *cchange;
3390  ReorderBufferTupleBuf *ctup;
3391  Pointer chunk;
3392 
3393  cchange = dlist_container(ReorderBufferChange, node, it.cur);
3394  ctup = cchange->data.tp.newtuple;
3395  chunk = DatumGetPointer(
3396  fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
3397 
3398  Assert(!isnull);
3399  Assert(!VARATT_IS_EXTERNAL(chunk));
3400  Assert(!VARATT_IS_SHORT(chunk));
3401 
3402  memcpy(VARDATA(reconstructed) + data_done,
3403  VARDATA(chunk),
3404  VARSIZE(chunk) - VARHDRSZ);
3405  data_done += VARSIZE(chunk) - VARHDRSZ;
3406  }
3407  Assert(data_done == toast_pointer.va_extsize);
3408 
3409  /* make sure its marked as compressed or not */
3410  if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
3411  SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
3412  else
3413  SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
3414 
3415  memset(&redirect_pointer, 0, sizeof(redirect_pointer));
3416  redirect_pointer.pointer = reconstructed;
3417 
3419  memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
3420  sizeof(redirect_pointer));
3421 
3422  attrs[natt] = PointerGetDatum(new_datum);
3423  }
3424 
3425  /*
3426  * Build tuple in separate memory & copy tuple back into the tuplebuf
3427  * passed to the output plugin. We can't directly heap_fill_tuple() into
3428  * the tuplebuf because attrs[] will point back into the current content.
3429  */
3430  tmphtup = heap_form_tuple(desc, attrs, isnull);
3431  Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
3432  Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
3433 
3434  memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
3435  newtup->tuple.t_len = tmphtup->t_len;
3436 
3437  /*
3438  * free resources we won't further need, more persistent stuff will be
3439  * free'd in ReorderBufferToastReset().
3440  */
3441  RelationClose(toast_rel);
3442  pfree(tmphtup);
3443  for (natt = 0; natt < desc->natts; natt++)
3444  {
3445  if (free[natt])
3446  pfree(DatumGetPointer(attrs[natt]));
3447  }
3448  pfree(attrs);
3449  pfree(free);
3450  pfree(isnull);
3451 
3452  MemoryContextSwitchTo(oldcontext);
3453 
3454  /* now add the change back, with the correct size */
3455  ReorderBufferChangeMemoryUpdate(rb, change, true);
3456 }
#define VARDATA(PTR)
Definition: postgres.h:302
#define fastgetattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:712
#define RelationGetDescr(relation)
Definition: rel.h:448
struct ReorderBufferChange::@101::@102 tp
#define VARSIZE(PTR)
Definition: