PostgreSQL Source Code  git master
reorderbuffer.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/detoast.h"
#include "access/heapam.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/combocid.h"
#include "utils/memdebug.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenodemap.h"
Include dependency graph for reorderbuffer.c:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTXNByIdEnt
 
struct  ReorderBufferTupleCidKey
 
struct  ReorderBufferTupleCidEnt
 
struct  TXNEntryFile
 
struct  ReorderBufferIterTXNEntry
 
struct  ReorderBufferIterTXNState
 
struct  ReorderBufferToastEnt
 
struct  ReorderBufferDiskChange
 
struct  RewriteMappingFile
 

Macros

#define IsSpecInsert(action)
 
#define IsSpecConfirm(action)
 
#define IsInsertOrUpdate(action)
 

Typedefs

typedef struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
 
typedef struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
 
typedef struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
 
typedef struct TXNEntryFile TXNEntryFile
 
typedef struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
 
typedef struct ReorderBufferIterTXNState ReorderBufferIterTXNState
 
typedef struct ReorderBufferToastEnt ReorderBufferToastEnt
 
typedef struct ReorderBufferDiskChange ReorderBufferDiskChange
 
typedef struct RewriteMappingFile RewriteMappingFile
 

Functions

static ReorderBufferTXNReorderBufferGetTXN (ReorderBuffer *rb)
 
static void ReorderBufferReturnTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferTXNReorderBufferTXNByXid (ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
 
static void ReorderBufferTransferSnapToParent (ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
 
static void AssertTXNLsnOrder (ReorderBuffer *rb)
 
static void ReorderBufferIterTXNInit (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
 
static ReorderBufferChangeReorderBufferIterTXNNext (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferIterTXNFinish (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferExecuteInvalidations (uint32 nmsgs, SharedInvalidationMessage *msgs)
 
static void ReorderBufferCheckMemoryLimit (ReorderBuffer *rb)
 
static void ReorderBufferSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeChange (ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
 
static Size ReorderBufferRestoreChanges (ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
 
static void ReorderBufferRestoreChange (ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
 
static void ReorderBufferRestoreCleanup (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferTruncateTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferCleanupSerializedTXNs (const char *slotname)
 
static void ReorderBufferSerializedPath (char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
 
static void ReorderBufferFreeSnap (ReorderBuffer *rb, Snapshot snap)
 
static Snapshot ReorderBufferCopySnap (ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
 
static bool ReorderBufferCanStream (ReorderBuffer *rb)
 
static bool ReorderBufferCanStartStreaming (ReorderBuffer *rb)
 
static void ReorderBufferStreamTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferStreamCommit (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastInitHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReset (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReplace (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void ReorderBufferToastAppendChunk (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static Size ReorderBufferChangeSize (ReorderBufferChange *change)
 
static void ReorderBufferChangeMemoryUpdate (ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
 
ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *rb)
 
void ReorderBufferReturnChange (ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
 
ReorderBufferTupleBufReorderBufferGetTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
 
OidReorderBufferGetRelids (ReorderBuffer *rb, int nrelids)
 
void ReorderBufferReturnRelids (ReorderBuffer *rb, Oid *relids)
 
static void ReorderBufferProcessPartialChange (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
static void AssertChangeLsnOrder (ReorderBufferTXN *txn)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
TransactionId ReorderBufferGetOldestXmin (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
static int ReorderBufferIterCompare (Datum a, Datum b, void *arg)
 
static void ReorderBufferCleanupTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferBuildTupleCidHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void SetupCheckXidLive (TransactionId xid)
 
static void ReorderBufferApplyChange (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyTruncate (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyMessage (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferSaveTXNSnapshot (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
 
static void ReorderBufferResetTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
 
static void ReorderBufferProcessTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
static void ReorderBufferSerializeReserve (ReorderBuffer *rb, Size sz)
 
static ReorderBufferTXNReorderBufferLargestTXN (ReorderBuffer *rb)
 
static ReorderBufferTXNReorderBufferLargestTopTXN (ReorderBuffer *rb)
 
void StartupReorderBuffer (void)
 
static void ApplyLogicalMappingFile (HTAB *tuplecid_data, Oid relid, const char *fname)
 
static bool TransactionIdInArray (TransactionId xid, TransactionId *xip, Size num)
 
static int file_sort_by_lsn (const ListCell *a_p, const ListCell *b_p)
 
static void UpdateLogicalMappings (HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
 
bool ResolveCminCmaxDuringDecoding (HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
 

Variables

int logical_decoding_work_mem
 
static const Size max_changes_in_memory = 4096
 

Macro Definition Documentation

◆ IsInsertOrUpdate

◆ IsSpecConfirm

◆ IsSpecInsert

Typedef Documentation

◆ ReorderBufferDiskChange

◆ ReorderBufferIterTXNEntry

◆ ReorderBufferIterTXNState

◆ ReorderBufferToastEnt

◆ ReorderBufferTupleCidEnt

◆ ReorderBufferTupleCidKey

◆ ReorderBufferTXNByIdEnt

◆ RewriteMappingFile

◆ TXNEntryFile

typedef struct TXNEntryFile TXNEntryFile

Function Documentation

◆ ApplyLogicalMappingFile()

static void ApplyLogicalMappingFile ( HTAB tuplecid_data,
Oid  relid,
const char *  fname 
)
static

Definition at line 4499 of file reorderbuffer.c.

References Assert, CloseTransientFile(), ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), HASH_ENTER, HASH_FIND, hash_search(), InvalidCommandId, ItemPointerCopy, sort-test::key, MAXPGPATH, LogicalRewriteMappingData::new_node, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_node, LogicalRewriteMappingData::old_tid, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, ReorderBufferTupleCidKey::relnode, sprintf, ReorderBufferTupleCidKey::tid, and WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ.

Referenced by UpdateLogicalMappings().

4500 {
4501  char path[MAXPGPATH];
4502  int fd;
4503  int readBytes;
4505 
4506  sprintf(path, "pg_logical/mappings/%s", fname);
4507  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
4508  if (fd < 0)
4509  ereport(ERROR,
4511  errmsg("could not open file \"%s\": %m", path)));
4512 
4513  while (true)
4514  {
4517  ReorderBufferTupleCidEnt *new_ent;
4518  bool found;
4519 
4520  /* be careful about padding */
4521  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
4522 
4523  /* read all mappings till the end of the file */
4525  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
4527 
4528  if (readBytes < 0)
4529  ereport(ERROR,
4531  errmsg("could not read file \"%s\": %m",
4532  path)));
4533  else if (readBytes == 0) /* EOF */
4534  break;
4535  else if (readBytes != sizeof(LogicalRewriteMappingData))
4536  ereport(ERROR,
4538  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
4539  path, readBytes,
4540  (int32) sizeof(LogicalRewriteMappingData))));
4541 
4542  key.relnode = map.old_node;
4543  ItemPointerCopy(&map.old_tid,
4544  &key.tid);
4545 
4546 
4547  ent = (ReorderBufferTupleCidEnt *)
4548  hash_search(tuplecid_data,
4549  (void *) &key,
4550  HASH_FIND,
4551  NULL);
4552 
4553  /* no existing mapping, no need to update */
4554  if (!ent)
4555  continue;
4556 
4557  key.relnode = map.new_node;
4558  ItemPointerCopy(&map.new_tid,
4559  &key.tid);
4560 
4561  new_ent = (ReorderBufferTupleCidEnt *)
4562  hash_search(tuplecid_data,
4563  (void *) &key,
4564  HASH_ENTER,
4565  &found);
4566 
4567  if (found)
4568  {
4569  /*
4570  * Make sure the existing mapping makes sense. We sometime update
4571  * old records that did not yet have a cmax (e.g. pg_class' own
4572  * entry while rewriting it) during rewrites, so allow that.
4573  */
4574  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
4575  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
4576  }
4577  else
4578  {
4579  /* update mapping */
4580  new_ent->cmin = ent->cmin;
4581  new_ent->cmax = ent->cmax;
4582  new_ent->combocid = ent->combocid;
4583  }
4584  }
4585 
4586  if (CloseTransientFile(fd) != 0)
4587  ereport(ERROR,
4589  errmsg("could not close file \"%s\": %m", path)));
4590 }
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:919
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1267
signed int int32
Definition: c.h:417
#define sprintf
Definition: port.h:217
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2399
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:714
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1466
int CloseTransientFile(int fd)
Definition: fd.c:2576
#define InvalidCommandId
Definition: c.h:592
#define ereport(elevel,...)
Definition: elog.h:155
ItemPointerData new_tid
Definition: rewriteheap.h:40
#define Assert(condition)
Definition: c.h:800
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1442
int errmsg(const char *fmt,...)
Definition: elog.c:902
#define read(a, b, c)
Definition: win32.h:13
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:161
ItemPointerData old_tid
Definition: rewriteheap.h:39

◆ AssertChangeLsnOrder()

static void AssertChangeLsnOrder ( ReorderBufferTXN txn)
static

Definition at line 905 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, and ReorderBufferChange::lsn.

Referenced by ReorderBufferIterTXNInit().

906 {
907 #ifdef USE_ASSERT_CHECKING
908  dlist_iter iter;
909  XLogRecPtr prev_lsn = txn->first_lsn;
910 
911  dlist_foreach(iter, &txn->changes)
912  {
913  ReorderBufferChange *cur_change;
914 
915  cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
916 
918  Assert(cur_change->lsn != InvalidXLogRecPtr);
919  Assert(txn->first_lsn <= cur_change->lsn);
920 
921  if (txn->end_lsn != InvalidXLogRecPtr)
922  Assert(cur_change->lsn <= txn->end_lsn);
923 
924  Assert(prev_lsn <= cur_change->lsn);
925 
926  prev_lsn = cur_change->lsn;
927  }
928 #endif
929 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
dlist_node * cur
Definition: ilist.h:161
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:800
XLogRecPtr end_lsn

◆ AssertTXNLsnOrder()

static void AssertTXNLsnOrder ( ReorderBuffer rb)
static

Definition at line 848 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferAssignChild(), ReorderBufferGetOldestTXN(), ReorderBufferGetOldestXmin(), ReorderBufferSetBaseSnapshot(), and ReorderBufferTXNByXid().

849 {
850 #ifdef USE_ASSERT_CHECKING
851  dlist_iter iter;
852  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
853  XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
854 
855  dlist_foreach(iter, &rb->toplevel_by_lsn)
856  {
858  iter.cur);
859 
860  /* start LSN must be set */
861  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
862 
863  /* If there is an end LSN, it must be higher than start LSN */
864  if (cur_txn->end_lsn != InvalidXLogRecPtr)
865  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
866 
867  /* Current initial LSN must be strictly higher than previous */
868  if (prev_first_lsn != InvalidXLogRecPtr)
869  Assert(prev_first_lsn < cur_txn->first_lsn);
870 
871  /* known-as-subtxn txns must not be listed */
872  Assert(!rbtxn_is_known_subxact(cur_txn));
873 
874  prev_first_lsn = cur_txn->first_lsn;
875  }
876 
878  {
880  base_snapshot_node,
881  iter.cur);
882 
883  /* base snapshot (and its LSN) must be set */
884  Assert(cur_txn->base_snapshot != NULL);
886 
887  /* current LSN must be strictly higher than previous */
888  if (prev_base_snap_lsn != InvalidXLogRecPtr)
889  Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
890 
891  /* known-as-subtxn txns must not be listed */
892  Assert(!rbtxn_is_known_subxact(cur_txn));
893 
894  prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
895  }
896 #endif
897 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
XLogRecPtr base_snapshot_lsn
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head txns_by_base_snapshot_lsn
dlist_head toplevel_by_lsn
dlist_node * cur
Definition: ilist.h:161
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:800
XLogRecPtr end_lsn
#define rbtxn_is_known_subxact(txn)

◆ file_sort_by_lsn()

static int file_sort_by_lsn ( const ListCell a_p,
const ListCell b_p 
)
static

Definition at line 4607 of file reorderbuffer.c.

References lfirst, and RewriteMappingFile::lsn.

Referenced by UpdateLogicalMappings().

4608 {
4611 
4612  if (a->lsn < b->lsn)
4613  return -1;
4614  else if (a->lsn > b->lsn)
4615  return 1;
4616  return 0;
4617 }
#define lfirst(lc)
Definition: pg_list.h:169

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2489 of file reorderbuffer.c.

References ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), ReorderBuffer::stream_abort, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeAbort().

2490 {
2491  ReorderBufferTXN *txn;
2492 
2493  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2494  false);
2495 
2496  /* unknown, nothing to remove */
2497  if (txn == NULL)
2498  return;
2499 
2500  /* For streamed transactions notify the remote node about the abort. */
2501  if (rbtxn_is_streamed(txn))
2502  {
2503  rb->stream_abort(rb, txn, lsn);
2504 
2505  /*
2506  * We might have decoded changes for this transaction that could load
2507  * the cache as per the current transaction's view (consider DDL's
2508  * happened in this transaction). We don't want the decoding of future
2509  * transactions to use those cache entries so execute invalidations.
2510  */
2511  if (txn->ninvalidations > 0)
2513  txn->invalidations);
2514  }
2515 
2516  /* cosmetic... */
2517  txn->final_lsn = lsn;
2518 
2519  /* remove potential on-disk data, and deallocate */
2520  ReorderBufferCleanupTXN(rb, txn);
2521 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
ReorderBufferStreamAbortCB stream_abort
#define rbtxn_is_streamed(txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)

Definition at line 2531 of file reorderbuffer.c.

References dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, elog, ReorderBufferCleanupTXN(), ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::xid.

Referenced by DecodeStandbyOp().

2532 {
2533  dlist_mutable_iter it;
2534 
2535  /*
2536  * Iterate through all (potential) toplevel TXNs and abort all that are
2537  * older than what possibly can be running. Once we've found the first
2538  * that is alive we stop, there might be some that acquired an xid earlier
2539  * but started writing later, but it's unlikely and they will be cleaned
2540  * up in a later call to this function.
2541  */
2543  {
2544  ReorderBufferTXN *txn;
2545 
2546  txn = dlist_container(ReorderBufferTXN, node, it.cur);
2547 
2548  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2549  {
2550  elog(DEBUG2, "aborting old transaction %u", txn->xid);
2551 
2552  /* remove potential on-disk data, and deallocate this tx */
2553  ReorderBufferCleanupTXN(rb, txn);
2554  }
2555  else
2556  return;
2557  }
2558 }
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define DEBUG2
Definition: elog.h:24
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
dlist_head toplevel_by_lsn
TransactionId xid
#define elog(elevel,...)
Definition: elog.h:228

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 2835 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferTXN::invalidations, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, palloc(), REORDER_BUFFER_CHANGE_INVALIDATION, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), repalloc(), ReorderBufferTXN::toptxn, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeXactOp().

2838 {
2839  ReorderBufferTXN *txn;
2840  MemoryContext oldcontext;
2841  ReorderBufferChange *change;
2842 
2843  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2844 
2845  oldcontext = MemoryContextSwitchTo(rb->context);
2846 
2847  /*
2848  * Collect all the invalidations under the top transaction so that we can
2849  * execute them all together. See comment atop this function
2850  */
2851  if (txn->toptxn)
2852  txn = txn->toptxn;
2853 
2854  Assert(nmsgs > 0);
2855 
2856  /* Accumulate invalidations. */
2857  if (txn->ninvalidations == 0)
2858  {
2859  txn->ninvalidations = nmsgs;
2861  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
2862  memcpy(txn->invalidations, msgs,
2863  sizeof(SharedInvalidationMessage) * nmsgs);
2864  }
2865  else
2866  {
2869  (txn->ninvalidations + nmsgs));
2870 
2871  memcpy(txn->invalidations + txn->ninvalidations, msgs,
2872  nmsgs * sizeof(SharedInvalidationMessage));
2873  txn->ninvalidations += nmsgs;
2874  }
2875 
2876  change = ReorderBufferGetChange(rb);
2878  change->data.inval.ninvalidations = nmsgs;
2879  change->data.inval.invalidations = (SharedInvalidationMessage *)
2880  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
2881  memcpy(change->data.inval.invalidations, msgs,
2882  sizeof(SharedInvalidationMessage) * nmsgs);
2883 
2884  ReorderBufferQueueChange(rb, xid, lsn, change, false);
2885 
2886  MemoryContextSwitchTo(oldcontext);
2887 }
union ReorderBufferChange::@98 data
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
struct ReorderBufferChange::@98::@103 inval
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
struct ReorderBufferTXN * toptxn
MemoryContext context
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
#define Assert(condition)
Definition: c.h:800
SharedInvalidationMessage * invalidations
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1070
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:950

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

Definition at line 2712 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::command_id, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, ReorderBufferGetChange(), and ReorderBufferQueueChange().

Referenced by SnapBuildProcessNewCid().

2714 {
2716 
2717  change->data.command_id = cid;
2719 
2720  ReorderBufferQueueChange(rb, xid, lsn, change, false);
2721 }
union ReorderBufferChange::@98 data
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileNode  node,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 2799 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessNewCid().

2803 {
2805  ReorderBufferTXN *txn;
2806 
2807  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2808 
2809  change->data.tuplecid.node = node;
2810  change->data.tuplecid.tid = tid;
2811  change->data.tuplecid.cmin = cmin;
2812  change->data.tuplecid.cmax = cmax;
2813  change->data.tuplecid.combocid = combocid;
2814  change->lsn = lsn;
2815  change->txn = txn;
2817 
2818  dlist_push_tail(&txn->tuplecids, &change->node);
2819  txn->ntuplecids++;
2820 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:87
union ReorderBufferChange::@98 data
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
struct ReorderBufferChange::@98::@102 tuplecid
dlist_head tuplecids

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 2663 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, ReorderBufferGetChange(), ReorderBufferQueueChange(), and ReorderBufferChange::snapshot.

Referenced by SnapBuildDistributeNewCatalogSnapshot().

2665 {
2667 
2668  change->data.snapshot = snap;
2670 
2671  ReorderBufferQueueChange(rb, xid, lsn, change, false);
2672 }
union ReorderBufferChange::@98 data
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)

◆ ReorderBufferAllocate()

ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 298 of file reorderbuffer.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, ReorderBufferCleanupSerializedTXNs(), ReorderBuffer::size, SLAB_DEFAULT_BLOCK_SIZE, SLAB_LARGE_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBuffer::toplevel_by_lsn, ReorderBuffer::tup_context, ReorderBuffer::txn_context, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

299 {
300  ReorderBuffer *buffer;
301  HASHCTL hash_ctl;
302  MemoryContext new_ctx;
303 
304  Assert(MyReplicationSlot != NULL);
305 
306  /* allocate memory in own context, to have better accountability */
308  "ReorderBuffer",
310 
311  buffer =
312  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
313 
314  memset(&hash_ctl, 0, sizeof(hash_ctl));
315 
316  buffer->context = new_ctx;
317 
318  buffer->change_context = SlabContextCreate(new_ctx,
319  "Change",
321  sizeof(ReorderBufferChange));
322 
323  buffer->txn_context = SlabContextCreate(new_ctx,
324  "TXN",
326  sizeof(ReorderBufferTXN));
327 
328  buffer->tup_context = GenerationContextCreate(new_ctx,
329  "Tuples",
331 
332  hash_ctl.keysize = sizeof(TransactionId);
333  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
334  hash_ctl.hcxt = buffer->context;
335 
336  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
338 
340  buffer->by_txn_last_txn = NULL;
341 
342  buffer->outbuf = NULL;
343  buffer->outbufsize = 0;
344  buffer->size = 0;
345 
346  buffer->spillTxns = 0;
347  buffer->spillCount = 0;
348  buffer->spillBytes = 0;
349  buffer->streamTxns = 0;
350  buffer->streamCount = 0;
351  buffer->streamBytes = 0;
352 
354 
355  dlist_init(&buffer->toplevel_by_lsn);
357 
358  /*
359  * Ensure there's no stale data from prior uses of this slot, in case some
360  * prior exit avoided calling ReorderBufferFree. Failure to do this can
361  * produce duplicated txns, and it's very cheap if there's nothing there.
362  */
364 
365  return buffer;
366 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define AllocSetContextCreate
Definition: memutils.h:170
#define HASH_CONTEXT
Definition: hsearch.h:91
#define HASH_ELEM
Definition: hsearch.h:85
uint32 TransactionId
Definition: c.h:575
MemoryContext hcxt
Definition: hsearch.h:77
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
Size entrysize
Definition: hsearch.h:72
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:174
ReplicationSlotPersistentData data
Definition: slot.h:143
MemoryContext change_context
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:222
dlist_head txns_by_base_snapshot_lsn
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
#define InvalidTransactionId
Definition: transam.h:31
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define HASH_BLOBS
Definition: hsearch.h:86
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size blockSize)
Definition: generation.c:196
MemoryContext context
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:326
ReorderBufferTXN * by_txn_last_txn
dlist_head toplevel_by_lsn
Size keysize
Definition: hsearch.h:71
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:800
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:221
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797
#define NameStr(name)
Definition: c.h:677
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
MemoryContext tup_context
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext txn_context

◆ ReorderBufferApplyChange()

static void ReorderBufferApplyChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1829 of file reorderbuffer.c.

References ReorderBuffer::apply_change, and ReorderBuffer::stream_change.

Referenced by ReorderBufferProcessTXN().

1832 {
1833  if (streaming)
1834  rb->stream_change(rb, txn, relation, change);
1835  else
1836  rb->apply_change(rb, txn, relation, change);
1837 }
ReorderBufferApplyChangeCB apply_change
ReorderBufferStreamChangeCB stream_change

◆ ReorderBufferApplyMessage()

static void ReorderBufferApplyMessage ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1857 of file reorderbuffer.c.

References ReorderBufferChange::data, ReorderBufferChange::lsn, ReorderBuffer::message, ReorderBufferChange::msg, and ReorderBuffer::stream_message.

Referenced by ReorderBufferProcessTXN().

1859 {
1860  if (streaming)
1861  rb->stream_message(rb, txn, change->lsn, true,
1862  change->data.msg.prefix,
1863  change->data.msg.message_size,
1864  change->data.msg.message);
1865  else
1866  rb->message(rb, txn, change->lsn, true,
1867  change->data.msg.prefix,
1868  change->data.msg.message_size,
1869  change->data.msg.message);
1870 }
union ReorderBufferChange::@98 data
ReorderBufferStreamMessageCB stream_message
ReorderBufferMessageCB message
struct ReorderBufferChange::@98::@101 msg

◆ ReorderBufferApplyTruncate()

static void ReorderBufferApplyTruncate ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  nrelations,
Relation relations,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1843 of file reorderbuffer.c.

References ReorderBuffer::apply_truncate, and ReorderBuffer::stream_truncate.

Referenced by ReorderBufferProcessTXN().

1846 {
1847  if (streaming)
1848  rb->stream_truncate(rb, txn, nrelations, relations, change);
1849  else
1850  rb->apply_truncate(rb, txn, nrelations, relations, change);
1851 }
ReorderBufferStreamTruncateCB stream_truncate
ReorderBufferApplyTruncateCB apply_truncate

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 991 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SUBXACT, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, ReorderBufferTXN::toplevel_xid, ReorderBufferTXN::toptxn, ReorderBufferTXNByIdEnt::txn, ReorderBufferTXN::txn_flags, and ReorderBufferTXNByIdEnt::xid.

Referenced by LogicalDecodingProcessRecord(), and ReorderBufferCommitChild().

993 {
994  ReorderBufferTXN *txn;
995  ReorderBufferTXN *subtxn;
996  bool new_top;
997  bool new_sub;
998 
999  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1000  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1001 
1002  if (!new_sub)
1003  {
1004  if (rbtxn_is_known_subxact(subtxn))
1005  {
1006  /* already associated, nothing to do */
1007  return;
1008  }
1009  else
1010  {
1011  /*
1012  * We already saw this transaction, but initially added it to the
1013  * list of top-level txns. Now that we know it's not top-level,
1014  * remove it from there.
1015  */
1016  dlist_delete(&subtxn->node);
1017  }
1018  }
1019 
1020  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1021  subtxn->toplevel_xid = xid;
1022  Assert(subtxn->nsubtxns == 0);
1023 
1024  /* set the reference to top-level transaction */
1025  subtxn->toptxn = txn;
1026 
1027  /* add to subtransaction list */
1028  dlist_push_tail(&txn->subtxns, &subtxn->node);
1029  txn->nsubtxns++;
1030 
1031  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1032  ReorderBufferTransferSnapToParent(txn, subtxn);
1033 
1034  /* Verify LSN-ordering invariant */
1035  AssertTXNLsnOrder(rb);
1036 }
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
#define RBTXN_IS_SUBXACT
struct ReorderBufferTXN * toptxn
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:800
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_known_subxact(txn)
TransactionId toplevel_xid

◆ ReorderBufferBuildTupleCidHash()

static void ReorderBufferBuildTupleCidHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1614 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, dlist_container, dlist_foreach, dlist_is_empty(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FIND, hash_search(), HASHCTL::hcxt, InvalidCommandId, ItemPointerCopy, sort-test::key, HASHCTL::keysize, ReorderBufferTXN::ntuplecids, rbtxn_has_catalog_changes, ReorderBufferTupleCidKey::relnode, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferTupleCidKey::tid, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::tuplecids.

Referenced by ReorderBufferProcessTXN().

1615 {
1616  dlist_iter iter;
1617  HASHCTL hash_ctl;
1618 
1620  return;
1621 
1622  memset(&hash_ctl, 0, sizeof(hash_ctl));
1623 
1624  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1625  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1626  hash_ctl.hcxt = rb->context;
1627 
1628  /*
1629  * create the hash with the exact number of to-be-stored tuplecids from
1630  * the start
1631  */
1632  txn->tuplecid_hash =
1633  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1635 
1636  dlist_foreach(iter, &txn->tuplecids)
1637  {
1640  bool found;
1641  ReorderBufferChange *change;
1642 
1643  change = dlist_container(ReorderBufferChange, node, iter.cur);
1644 
1646 
1647  /* be careful about padding */
1648  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1649 
1650  key.relnode = change->data.tuplecid.node;
1651 
1652  ItemPointerCopy(&change->data.tuplecid.tid,
1653  &key.tid);
1654 
1655  ent = (ReorderBufferTupleCidEnt *)
1657  (void *) &key,
1659  &found);
1660  if (!found)
1661  {
1662  ent->cmin = change->data.tuplecid.cmin;
1663  ent->cmax = change->data.tuplecid.cmax;
1664  ent->combocid = change->data.tuplecid.combocid;
1665  }
1666  else
1667  {
1668  /*
1669  * Maybe we already saw this tuple before in this transaction, but
1670  * if so it must have the same cmin.
1671  */
1672  Assert(ent->cmin == change->data.tuplecid.cmin);
1673 
1674  /*
1675  * cmax may be initially invalid, but once set it can only grow,
1676  * and never become invalid again.
1677  */
1678  Assert((ent->cmax == InvalidCommandId) ||
1679  ((change->data.tuplecid.cmax != InvalidCommandId) &&
1680  (change->data.tuplecid.cmax > ent->cmax)));
1681  ent->cmax = change->data.tuplecid.cmax;
1682  }
1683  }
1684 }
#define HASH_CONTEXT
Definition: hsearch.h:91
#define HASH_ELEM
Definition: hsearch.h:85
MemoryContext hcxt
Definition: hsearch.h:77
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
union ReorderBufferChange::@98 data
Size entrysize
Definition: hsearch.h:72
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:919
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
#define HASH_BLOBS
Definition: hsearch.h:86
MemoryContext context
#define InvalidCommandId
Definition: c.h:592
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:326
Size keysize
Definition: hsearch.h:71
dlist_node * cur
Definition: ilist.h:161
#define Assert(condition)
Definition: c.h:800
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define rbtxn_has_catalog_changes(txn)
struct ReorderBufferChange::@98::@102 tuplecid
dlist_head tuplecids
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:161

◆ ReorderBufferCanStartStreaming()

static bool ReorderBufferCanStartStreaming ( ReorderBuffer rb)
inlinestatic

Definition at line 3459 of file reorderbuffer.c.

References Assert, XLogReaderState::EndRecPtr, ReorderBuffer::private_data, LogicalDecodingContext::reader, ReorderBufferCanStream(), SNAPBUILD_CONSISTENT, SnapBuildCurrentState(), SnapBuildXactNeedsSkip(), and LogicalDecodingContext::snapshot_builder.

Referenced by ReorderBufferCheckMemoryLimit(), and ReorderBufferProcessPartialChange().

3460 {
3462  SnapBuild *builder = ctx->snapshot_builder;
3463 
3464  /*
3465  * We can't start streaming immediately even if the streaming is enabled
3466  * because we previously decoded this transaction and now just are
3467  * restarting.
3468  */
3469  if (ReorderBufferCanStream(rb) &&
3470  !SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
3471  {
3472  /* We must have a consistent snapshot by this time */
3474  return true;
3475  }
3476 
3477  return false;
3478 }
void * private_data
bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
Definition: snapbuild.c:404
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
Definition: snapbuild.c:395
XLogRecPtr EndRecPtr
Definition: xlogreader.h:176
static bool ReorderBufferCanStream(ReorderBuffer *rb)
struct SnapBuild * snapshot_builder
Definition: logical.h:43
#define Assert(condition)
Definition: c.h:800
XLogReaderState * reader
Definition: logical.h:41

◆ ReorderBufferCanStream()

static bool ReorderBufferCanStream ( ReorderBuffer rb)
inlinestatic

◆ ReorderBufferChangeMemoryUpdate()

static void ReorderBufferChangeMemoryUpdate ( ReorderBuffer rb,
ReorderBufferChange change,
bool  addition 
)
static

Definition at line 2737 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferCanStream(), ReorderBufferChangeSize(), ReorderBufferTXN::size, ReorderBuffer::size, ReorderBufferTXN::toptxn, ReorderBufferTXN::total_size, ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferReturnChange(), and ReorderBufferToastReplace().

2740 {
2741  Size sz;
2742  ReorderBufferTXN *txn;
2743  ReorderBufferTXN *toptxn = NULL;
2744 
2745  Assert(change->txn);
2746 
2747  /*
2748  * Ignore tuple CID changes, because those are not evicted when reaching
2749  * memory limit. So we just don't count them, because it might easily
2750  * trigger a pointless attempt to spill.
2751  */
2753  return;
2754 
2755  txn = change->txn;
2756 
2757  /* If streaming supported, update the total size in top level as well. */
2758  if (ReorderBufferCanStream(rb))
2759  {
2760  if (txn->toptxn != NULL)
2761  toptxn = txn->toptxn;
2762  else
2763  toptxn = txn;
2764  }
2765 
2766  sz = ReorderBufferChangeSize(change);
2767 
2768  if (addition)
2769  {
2770  txn->size += sz;
2771  rb->size += sz;
2772 
2773  /* Update the total size in the top transaction. */
2774  if (toptxn)
2775  toptxn->total_size += sz;
2776  }
2777  else
2778  {
2779  Assert((rb->size >= sz) && (txn->size >= sz));
2780  txn->size -= sz;
2781  rb->size -= sz;
2782 
2783  /* Update the total size in the top transaction. */
2784  if (toptxn)
2785  toptxn->total_size -= sz;
2786  }
2787 
2788  Assert(txn->size <= rb->size);
2789 }
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:87
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
static bool ReorderBufferCanStream(ReorderBuffer *rb)
struct ReorderBufferTXN * toptxn
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
#define Assert(condition)
Definition: c.h:800
size_t Size
Definition: c.h:528

◆ ReorderBufferChangeSize()

static Size ReorderBufferChangeSize ( ReorderBufferChange change)
static

Definition at line 3599 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::msg, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::snapshot, SnapshotData::subxcnt, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTupleBuf::tuple, and SnapshotData::xcnt.

Referenced by ReorderBufferChangeMemoryUpdate().

3600 {
3601  Size sz = sizeof(ReorderBufferChange);
3602 
3603  switch (change->action)
3604  {
3605  /* fall through these, they're all similar enough */
3610  {
3611  ReorderBufferTupleBuf *oldtup,
3612  *newtup;
3613  Size oldlen = 0;
3614  Size newlen = 0;
3615 
3616  oldtup = change->data.tp.oldtuple;
3617  newtup = change->data.tp.newtuple;
3618 
3619  if (oldtup)
3620  {
3621  sz += sizeof(HeapTupleData);
3622  oldlen = oldtup->tuple.t_len;
3623  sz += oldlen;
3624  }
3625 
3626  if (newtup)
3627  {
3628  sz += sizeof(HeapTupleData);
3629  newlen = newtup->tuple.t_len;
3630  sz += newlen;
3631  }
3632 
3633  break;
3634  }
3636  {
3637  Size prefix_size = strlen(change->data.msg.prefix) + 1;
3638 
3639  sz += prefix_size + change->data.msg.message_size +
3640  sizeof(Size) + sizeof(Size);
3641 
3642  break;
3643  }
3645  {
3646  sz += sizeof(SharedInvalidationMessage) *
3647  change->data.inval.ninvalidations;
3648  break;
3649  }
3651  {
3652  Snapshot snap;
3653 
3654  snap = change->data.snapshot;
3655 
3656  sz += sizeof(SnapshotData) +
3657  sizeof(TransactionId) * snap->xcnt +
3658  sizeof(TransactionId) * snap->subxcnt;
3659 
3660  break;
3661  }
3663  {
3664  sz += sizeof(Oid) * change->data.truncate.nrelids;
3665 
3666  break;
3667  }
3671  /* ReorderBufferChange contains everything important */
3672  break;
3673  }
3674 
3675  return sz;
3676 }
uint32 TransactionId
Definition: c.h:575
union ReorderBufferChange::@98 data
struct ReorderBufferChange::@98::@103 inval
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
uint32 t_len
Definition: htup.h:64
HeapTupleData tuple
Definition: reorderbuffer.h:29
struct SnapshotData SnapshotData
size_t Size
Definition: c.h:528
uint32 xcnt
Definition: snapshot.h:169
struct ReorderBufferChange::@98::@100 truncate
struct ReorderBufferChange::@98::@99 tp
struct ReorderBufferChange::@98::@101 msg
struct HeapTupleData HeapTupleData
struct ReorderBufferChange ReorderBufferChange
int32 subxcnt
Definition: snapshot.h:181

◆ ReorderBufferCheckMemoryLimit()

static void ReorderBufferCheckMemoryLimit ( ReorderBuffer rb)
static

Definition at line 3082 of file reorderbuffer.c.

References Assert, logical_decoding_work_mem, ReorderBufferTXN::nentries_mem, ReorderBufferCanStartStreaming(), ReorderBufferLargestTopTXN(), ReorderBufferLargestTXN(), ReorderBufferSerializeTXN(), ReorderBufferStreamTXN(), ReorderBufferTXN::size, ReorderBuffer::size, ReorderBufferTXN::toptxn, ReorderBufferTXN::total_size, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferQueueChange().

3083 {
3084  ReorderBufferTXN *txn;
3085 
3086  /* bail out if we haven't exceeded the memory limit */
3087  if (rb->size < logical_decoding_work_mem * 1024L)
3088  return;
3089 
3090  /*
3091  * Loop until we reach under the memory limit. One might think that just
3092  * by evicting the largest (sub)transaction we will come under the memory
3093  * limit based on assumption that the selected transaction is at least as
3094  * large as the most recent change (which caused us to go over the memory
3095  * limit). However, that is not true because a user can reduce the
3096  * logical_decoding_work_mem to a smaller value before the most recent
3097  * change.
3098  */
3099  while (rb->size >= logical_decoding_work_mem * 1024L)
3100  {
3101  /*
3102  * Pick the largest transaction (or subtransaction) and evict it from
3103  * memory by streaming, if possible. Otherwise, spill to disk.
3104  */
3106  (txn = ReorderBufferLargestTopTXN(rb)) != NULL)
3107  {
3108  /* we know there has to be one, because the size is not zero */
3109  Assert(txn && !txn->toptxn);
3110  Assert(txn->total_size > 0);
3111  Assert(rb->size >= txn->total_size);
3112 
3113  ReorderBufferStreamTXN(rb, txn);
3114  }
3115  else
3116  {
3117  /*
3118  * Pick the largest transaction (or subtransaction) and evict it
3119  * from memory by serializing it to disk.
3120  */
3121  txn = ReorderBufferLargestTXN(rb);
3122 
3123  /* we know there has to be one, because the size is not zero */
3124  Assert(txn);
3125  Assert(txn->size > 0);
3126  Assert(rb->size >= txn->size);
3127 
3128  ReorderBufferSerializeTXN(rb, txn);
3129  }
3130 
3131  /*
3132  * After eviction, the transaction should have no entries in memory,
3133  * and should use 0 bytes for changes.
3134  */
3135  Assert(txn->size == 0);
3136  Assert(txn->nentries_mem == 0);
3137  }
3138 
3139  /* We must be under the memory limit now. */
3140  Assert(rb->size < logical_decoding_work_mem * 1024L);
3141 }
static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferLargestTopTXN(ReorderBuffer *rb)
struct ReorderBufferTXN * toptxn
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:800
int logical_decoding_work_mem
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)

◆ ReorderBufferCleanupSerializedTXNs()

static void ReorderBufferCleanupSerializedTXNs ( const char *  slotname)
static

Definition at line 4020 of file reorderbuffer.c.

References AllocateDir(), dirent::d_name, ereport, errcode_for_file_access(), errmsg(), ERROR, FreeDir(), INFO, lstat, MAXPGPATH, ReadDirExtended(), S_ISDIR, snprintf, sprintf, and stat::st_mode.

Referenced by ReorderBufferAllocate(), ReorderBufferFree(), and StartupReorderBuffer().

4021 {
4022  DIR *spill_dir;
4023  struct dirent *spill_de;
4024  struct stat statbuf;
4025  char path[MAXPGPATH * 2 + 12];
4026 
4027  sprintf(path, "pg_replslot/%s", slotname);
4028 
4029  /* we're only handling directories here, skip if it's not ours */
4030  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
4031  return;
4032 
4033  spill_dir = AllocateDir(path);
4034  while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
4035  {
4036  /* only look at names that can be ours */
4037  if (strncmp(spill_de->d_name, "xid", 3) == 0)
4038  {
4039  snprintf(path, sizeof(path),
4040  "pg_replslot/%s/%s", slotname,
4041  spill_de->d_name);
4042 
4043  if (unlink(path) != 0)
4044  ereport(ERROR,
4046  errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
4047  path, slotname)));
4048  }
4049  }
4050  FreeDir(spill_dir);
4051 }
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2691
#define INFO
Definition: elog.h:33
Definition: dirent.h:9
#define sprintf
Definition: port.h:217
Definition: dirent.c:25
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:714
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2610
#define ereport(elevel,...)
Definition: elog.h:155
#define S_ISDIR(m)
Definition: win32_port.h:316
#define lstat(path, sb)
Definition: win32_port.h:276
int errmsg(const char *fmt,...)
Definition: elog.c:902
char d_name[MAX_PATH]
Definition: dirent.h:15
#define snprintf
Definition: port.h:215
int FreeDir(DIR *dir)
Definition: fd.c:2728

◆ ReorderBufferCleanupTXN()

static void ReorderBufferCleanupTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1421 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_node, ReorderBuffer::by_txn, ReorderBufferTXN::changes, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, HASH_REMOVE, hash_search(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, rbtxn_is_serialized, rbtxn_is_streamed, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferFreeSnap(), ReorderBufferRestoreCleanup(), ReorderBufferReturnChange(), ReorderBufferReturnTXN(), SnapBuildSnapDecRefcount(), ReorderBufferTXN::snapshot_now, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAbortOld(), ReorderBufferCommit(), ReorderBufferForget(), ReorderBufferProcessTXN(), and ReorderBufferStreamCommit().

1422 {
1423  bool found;
1424  dlist_mutable_iter iter;
1425 
1426  /* cleanup subtransactions & their changes */
1427  dlist_foreach_modify(iter, &txn->subtxns)
1428  {
1429  ReorderBufferTXN *subtxn;
1430 
1431  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1432 
1433  /*
1434  * Subtransactions are always associated to the toplevel TXN, even if
1435  * they originally were happening inside another subtxn, so we won't
1436  * ever recurse more than one level deep here.
1437  */
1438  Assert(rbtxn_is_known_subxact(subtxn));
1439  Assert(subtxn->nsubtxns == 0);
1440 
1441  ReorderBufferCleanupTXN(rb, subtxn);
1442  }
1443 
1444  /* cleanup changes in the txn */
1445  dlist_foreach_modify(iter, &txn->changes)
1446  {
1447  ReorderBufferChange *change;
1448 
1449  change = dlist_container(ReorderBufferChange, node, iter.cur);
1450 
1451  /* Check we're not mixing changes from different transactions. */
1452  Assert(change->txn == txn);
1453 
1454  ReorderBufferReturnChange(rb, change, true);
1455  }
1456 
1457  /*
1458  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1459  * They are always stored in the toplevel transaction.
1460  */
1461  dlist_foreach_modify(iter, &txn->tuplecids)
1462  {
1463  ReorderBufferChange *change;
1464 
1465  change = dlist_container(ReorderBufferChange, node, iter.cur);
1466 
1467  /* Check we're not mixing changes from different transactions. */
1468  Assert(change->txn == txn);
1470 
1471  ReorderBufferReturnChange(rb, change, true);
1472  }
1473 
1474  /*
1475  * Cleanup the base snapshot, if set.
1476  */
1477  if (txn->base_snapshot != NULL)
1478  {
1481  }
1482 
1483  /*
1484  * Cleanup the snapshot for the last streamed run.
1485  */
1486  if (txn->snapshot_now != NULL)
1487  {
1488  Assert(rbtxn_is_streamed(txn));
1490  }
1491 
1492  /*
1493  * Remove TXN from its containing list.
1494  *
1495  * Note: if txn is known as subxact, we are deleting the TXN from its
1496  * parent's list of known subxacts; this leaves the parent's nsubxacts
1497  * count too high, but we don't care. Otherwise, we are deleting the TXN
1498  * from the LSN-ordered list of toplevel TXNs.
1499  */
1500  dlist_delete(&txn->node);
1501 
1502  /* now remove reference from buffer */
1503  hash_search(rb->by_txn,
1504  (void *) &txn->xid,
1505  HASH_REMOVE,
1506  &found);
1507  Assert(found);
1508 
1509  /* remove entries spilled to disk */
1510  if (rbtxn_is_serialized(txn))
1511  ReorderBufferRestoreCleanup(rb, txn);
1512 
1513  /* deallocate */
1514  ReorderBufferReturnTXN(rb, txn);
1515 }
Snapshot base_snapshot
dlist_node base_snapshot_node
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:87
Snapshot snapshot_now
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:919
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
#define rbtxn_is_streamed(txn)
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
#define Assert(condition)
Definition: c.h:800
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:428
dlist_head subtxns
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
#define rbtxn_is_serialized(txn)
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
#define rbtxn_is_known_subxact(txn)
dlist_head tuplecids

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2420 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, FirstCommandId, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), ReorderBufferStreamCommit(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

2424 {
2425  ReorderBufferTXN *txn;
2426  Snapshot snapshot_now;
2427  CommandId command_id = FirstCommandId;
2428 
2429  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2430  false);
2431 
2432  /* unknown transaction, nothing to replay */
2433  if (txn == NULL)
2434  return;
2435 
2436  txn->final_lsn = commit_lsn;
2437  txn->end_lsn = end_lsn;
2438  txn->commit_time = commit_time;
2439  txn->origin_id = origin_id;
2440  txn->origin_lsn = origin_lsn;
2441 
2442  /*
2443  * If the transaction was (partially) streamed, we need to commit it in a
2444  * 'streamed' way. That is, we first stream the remaining part of the
2445  * transaction, and then invoke stream_commit message.
2446  *
2447  * Called after everything (origin ID, LSN, ...) is stored in the
2448  * transaction to avoid passing that information directly.
2449  */
2450  if (rbtxn_is_streamed(txn))
2451  {
2452  ReorderBufferStreamCommit(rb, txn);
2453  return;
2454  }
2455 
2456  /*
2457  * If this transaction has no snapshot, it didn't make any changes to the
2458  * database, so there's nothing to decode. Note that
2459  * ReorderBufferCommitChild will have transferred any snapshots from
2460  * subtransactions if there were any.
2461  */
2462  if (txn->base_snapshot == NULL)
2463  {
2464  Assert(txn->ninvalidations == 0);
2465  ReorderBufferCleanupTXN(rb, txn);
2466  return;
2467  }
2468 
2469  snapshot_now = txn->base_snapshot;
2470 
2471  /* Process and send the changes to output plugin. */
2472  ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2473  command_id, false);
2474 }
uint32 CommandId
Definition: c.h:589
TimestampTz commit_time
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
RepOriginId origin_id
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
XLogRecPtr origin_lsn
#define FirstCommandId
Definition: c.h:591
#define rbtxn_is_streamed(txn)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:800
XLogRecPtr end_lsn
static void ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 1111 of file reorderbuffer.c.

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

1114 {
1115  ReorderBufferTXN *subtxn;
1116 
1117  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1118  InvalidXLogRecPtr, false);
1119 
1120  /*
1121  * No need to do anything if that subtxn didn't contain any changes
1122  */
1123  if (!subtxn)
1124  return;
1125 
1126  subtxn->final_lsn = commit_lsn;
1127  subtxn->end_lsn = end_lsn;
1128 
1129  /*
1130  * Assign this subxact as a child of the toplevel xact (no-op if already
1131  * done.)
1132  */
1133  ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1134 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr final_lsn
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
XLogRecPtr end_lsn
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferCopySnap()

static Snapshot ReorderBufferCopySnap ( ReorderBuffer rb,
Snapshot  orig_snap,
ReorderBufferTXN txn,
CommandId  cid 
)
static

Definition at line 1692 of file reorderbuffer.c.

References SnapshotData::active_count, ReorderBuffer::context, SnapshotData::copied, dlist_iter::cur, SnapshotData::curcid, dlist_container, dlist_foreach, i, MemoryContextAllocZero(), ReorderBufferTXN::nsubtxns, qsort, SnapshotData::regd_count, ReorderBufferTXN::subtxns, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::xcnt, ReorderBufferTXN::xid, xidComparator(), and SnapshotData::xip.

Referenced by ReorderBufferProcessTXN(), ReorderBufferSaveTXNSnapshot(), and ReorderBufferStreamTXN().

1694 {
1695  Snapshot snap;
1696  dlist_iter iter;
1697  int i = 0;
1698  Size size;
1699 
1700  size = sizeof(SnapshotData) +
1701  sizeof(TransactionId) * orig_snap->xcnt +
1702  sizeof(TransactionId) * (txn->nsubtxns + 1);
1703 
1704  snap = MemoryContextAllocZero(rb->context, size);
1705  memcpy(snap, orig_snap, sizeof(SnapshotData));
1706 
1707  snap->copied = true;
1708  snap->active_count = 1; /* mark as active so nobody frees it */
1709  snap->regd_count = 0;
1710  snap->xip = (TransactionId *) (snap + 1);
1711 
1712  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1713 
1714  /*
1715  * snap->subxip contains all txids that belong to our transaction which we
1716  * need to check via cmin/cmax. That's why we store the toplevel
1717  * transaction in there as well.
1718  */
1719  snap->subxip = snap->xip + snap->xcnt;
1720  snap->subxip[i++] = txn->xid;
1721 
1722  /*
1723  * subxcnt isn't decreased when subtransactions abort, so count manually.
1724  * Since it's an upper boundary it is safe to use it for the allocation
1725  * above.
1726  */
1727  snap->subxcnt = 1;
1728 
1729  dlist_foreach(iter, &txn->subtxns)
1730  {
1731  ReorderBufferTXN *sub_txn;
1732 
1733  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1734  snap->subxip[i++] = sub_txn->xid;
1735  snap->subxcnt++;
1736  }
1737 
1738  /* sort so we can bsearch() later */
1739  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1740 
1741  /* store the specified current CommandId */
1742  snap->curcid = cid;
1743 
1744  return snap;
1745 }
uint32 TransactionId
Definition: c.h:575
bool copied
Definition: snapshot.h:185
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
uint32 regd_count
Definition: snapshot.h:205
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
struct SnapshotData SnapshotData
TransactionId * xip
Definition: snapshot.h:168
MemoryContext context
TransactionId xid
dlist_node * cur
Definition: ilist.h:161
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:840
CommandId curcid
Definition: snapshot.h:187
size_t Size
Definition: c.h:528
dlist_head subtxns
uint32 xcnt
Definition: snapshot.h:169
int i
#define qsort(a, b, c, d)
Definition: port.h:497
TransactionId * subxip
Definition: snapshot.h:180
uint32 active_count
Definition: snapshot.h:204
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:136
int32 subxcnt
Definition: snapshot.h:181

◆ ReorderBufferExecuteInvalidations()

static void ReorderBufferExecuteInvalidations ( uint32  nmsgs,
SharedInvalidationMessage msgs 
)
static

Definition at line 2894 of file reorderbuffer.c.

References i, and LocalExecuteInvalidationMessage().

Referenced by ReorderBufferProcessTXN().

2895 {
2896  int i;
2897 
2898  for (i = 0; i < nmsgs; i++)
2900 }
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:559
int i

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2574 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), ReorderBuffer::stream_abort, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

2575 {
2576  ReorderBufferTXN *txn;
2577 
2578  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2579  false);
2580 
2581  /* unknown, nothing to forget */
2582  if (txn == NULL)
2583  return;
2584 
2585  /* For streamed transactions notify the remote node about the abort. */
2586  if (rbtxn_is_streamed(txn))
2587  rb->stream_abort(rb, txn, lsn);
2588 
2589  /* cosmetic... */
2590  txn->final_lsn = lsn;
2591 
2592  /*
2593  * Process cache invalidation messages if there are any. Even if we're not
2594  * interested in the transaction's contents, it could have manipulated the
2595  * catalog and we need to update the caches according to that.
2596  */
2597  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2599  txn->invalidations);
2600  else
2601  Assert(txn->ninvalidations == 0);
2602 
2603  /* remove potential on-disk data, and deallocate */
2604  ReorderBufferCleanupTXN(rb, txn);
2605 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
ReorderBufferStreamAbortCB stream_abort
#define rbtxn_is_streamed(txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:800
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer rb)

Definition at line 372 of file reorderbuffer.c.

References ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

373 {
374  MemoryContext context = rb->context;
375 
376  /*
377  * We free separately allocated data by entirely scrapping reorderbuffer's
378  * memory context.
379  */
380  MemoryContextDelete(context);
381 
382  /* Free disk space used by unconsumed reorder buffers */
384 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:212
ReplicationSlotPersistentData data
Definition: slot.h:143
MemoryContext context
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NameStr(name)
Definition: c.h:677
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)

◆ ReorderBufferFreeSnap()

static void ReorderBufferFreeSnap ( ReorderBuffer rb,
Snapshot  snap 
)
static

Definition at line 1751 of file reorderbuffer.c.

References SnapshotData::copied, pfree(), and SnapBuildSnapDecRefcount().

Referenced by ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), ReorderBufferReturnChange(), and ReorderBufferStreamTXN().

1752 {
1753  if (snap->copied)
1754  pfree(snap);
1755  else
1757 }
bool copied
Definition: snapshot.h:185
void pfree(void *pointer)
Definition: mcxt.c:1057
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:428

◆ ReorderBufferGetChange()

ReorderBufferChange* ReorderBufferGetChange ( ReorderBuffer rb)

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer rb)

Definition at line 936 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessRunningXacts().

937 {
938  ReorderBufferTXN *txn;
939 
940  AssertTXNLsnOrder(rb);
941 
943  return NULL;
944 
946 
949  return txn;
950 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
dlist_head toplevel_by_lsn
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:800
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define rbtxn_is_known_subxact(txn)

◆ ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBuffer rb)

Definition at line 964 of file reorderbuffer.c.

References AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, dlist_head_element, dlist_is_empty(), InvalidTransactionId, ReorderBufferTXNByIdEnt::txn, ReorderBuffer::txns_by_base_snapshot_lsn, and SnapshotData::xmin.

Referenced by SnapBuildProcessRunningXacts().

965 {
966  ReorderBufferTXN *txn;
967 
968  AssertTXNLsnOrder(rb);
969 
971  return InvalidTransactionId;
972 
973  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
975  return txn->base_snapshot->xmin;
976 }
Snapshot base_snapshot
dlist_head txns_by_base_snapshot_lsn
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: snapshot.h:157
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289

◆ ReorderBufferGetRelids()

Oid* ReorderBufferGetRelids ( ReorderBuffer rb,
int  nrelids 
)

Definition at line 563 of file reorderbuffer.c.

References ReorderBuffer::context, and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

564 {
565  Oid *relids;
566  Size alloc_len;
567 
568  alloc_len = sizeof(Oid) * nrelids;
569 
570  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
571 
572  return relids;
573 }
unsigned int Oid
Definition: postgres_ext.h:31
MemoryContext context
size_t Size
Definition: c.h:528
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797

◆ ReorderBufferGetTupleBuf()

ReorderBufferTupleBuf* ReorderBufferGetTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)

Definition at line 527 of file reorderbuffer.c.

References ReorderBufferTupleBuf::alloc_tuple_size, MemoryContextAlloc(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, HeapTupleData::t_data, ReorderBuffer::tup_context, and ReorderBufferTupleBuf::tuple.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

528 {
529  ReorderBufferTupleBuf *tuple;
530  Size alloc_len;
531 
532  alloc_len = tuple_len + SizeofHeapTupleHeader;
533 
534  tuple = (ReorderBufferTupleBuf *)
536  sizeof(ReorderBufferTupleBuf) +
537  MAXIMUM_ALIGNOF + alloc_len);
538  tuple->alloc_tuple_size = alloc_len;
539  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
540 
541  return tuple;
542 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
HeapTupleHeader t_data
Definition: htup.h:68
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:38
HeapTupleData tuple
Definition: reorderbuffer.h:29
size_t Size
Definition: c.h:528
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797
MemoryContext tup_context

◆ ReorderBufferGetTXN()

static ReorderBufferTXN * ReorderBufferGetTXN ( ReorderBuffer rb)
static

Definition at line 390 of file reorderbuffer.c.

References ReorderBufferTXN::changes, ReorderBufferTXN::command_id, dlist_init(), InvalidCommandId, MemoryContextAlloc(), ReorderBufferTXN::output_plugin_private, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferTXNByIdEnt::txn, and ReorderBuffer::txn_context.

Referenced by ReorderBufferTXNByXid().

391 {
392  ReorderBufferTXN *txn;
393 
394  txn = (ReorderBufferTXN *)
396 
397  memset(txn, 0, sizeof(ReorderBufferTXN));
398 
399  dlist_init(&txn->changes);
400  dlist_init(&txn->tuplecids);
401  dlist_init(&txn->subtxns);
402 
403  /* InvalidCommandId is not zero, so set it explicitly */
405  txn->output_plugin_private = NULL;
406 
407  return txn;
408 }
CommandId command_id
dlist_head changes
#define InvalidCommandId
Definition: c.h:592
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
dlist_head subtxns
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797
void * output_plugin_private
MemoryContext txn_context
dlist_head tuplecids

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 2614 of file reorderbuffer.c.

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by DecodeXactOp(), ReorderBufferAbort(), and ReorderBufferForget().

2616 {
2617  bool use_subtxn = IsTransactionOrTransactionBlock();
2618  int i;
2619 
2620  if (use_subtxn)
2621  BeginInternalSubTransaction("replay");
2622 
2623  /*
2624  * Force invalidations to happen outside of a valid transaction - that way
2625  * entries will just be marked as invalid without accessing the catalog.
2626  * That's advantageous because we don't need to setup the full state
2627  * necessary for catalog access.
2628  */
2629  if (use_subtxn)
2631 
2632  for (i = 0; i < ninvalidations; i++)
2633  LocalExecuteInvalidationMessage(&invalidations[i]);
2634 
2635  if (use_subtxn)
2637 }
void AbortCurrentTransaction(void)
Definition: xact.c:3212
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4703
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4514
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4409
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:559
int i

◆ ReorderBufferIterCompare()

static int ReorderBufferIterCompare ( Datum  a,
Datum  b,
void *  arg 
)
static

Definition at line 1153 of file reorderbuffer.c.

References DatumGetInt32, ReorderBufferIterTXNState::entries, and ReorderBufferIterTXNEntry::lsn.

Referenced by ReorderBufferIterTXNInit().

1154 {
1156  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
1157  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
1158 
1159  if (pos_a < pos_b)
1160  return 1;
1161  else if (pos_a == pos_b)
1162  return 0;
1163  return -1;
1164 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
#define DatumGetInt32(X)
Definition: postgres.h:472
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:298
void * arg

◆ ReorderBufferIterTXNFinish()

static void ReorderBufferIterTXNFinish ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1390 of file reorderbuffer.c.

References Assert, binaryheap_free(), dlist_container, dlist_is_empty(), dlist_pop_head_node(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::file, FileClose(), ReorderBufferIterTXNState::heap, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, pfree(), ReorderBufferReturnChange(), and TXNEntryFile::vfd.

Referenced by ReorderBufferProcessTXN().

1392 {
1393  int32 off;
1394 
1395  for (off = 0; off < state->nr_txns; off++)
1396  {
1397  if (state->entries[off].file.vfd != -1)
1398  FileClose(state->entries[off].file.vfd);
1399  }
1400 
1401  /* free memory we might have "leaked" in the last *Next call */
1402  if (!dlist_is_empty(&state->old_change))
1403  {
1404  ReorderBufferChange *change;
1405 
1406  change = dlist_container(ReorderBufferChange, node,
1407  dlist_pop_head_node(&state->old_change));
1408  ReorderBufferReturnChange(rb, change, true);
1409  Assert(dlist_is_empty(&state->old_change));
1410  }
1411 
1412  binaryheap_free(state->heap);
1413  pfree(state);
1414 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
signed int int32
Definition: c.h:417
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:1057
void FileClose(File file)
Definition: fd.c:1853
#define Assert(condition)
Definition: c.h:800
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:69
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)

◆ ReorderBufferIterTXNInit()

static void ReorderBufferIterTXNInit ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferIterTXNState *volatile *  iter_state 
)
static

Definition at line 1176 of file reorderbuffer.c.

References AssertChangeLsnOrder(), binaryheap_add_unordered(), binaryheap_allocate(), binaryheap_build(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, ReorderBuffer::context, dlist_iter::cur, dlist_container, dlist_foreach, dlist_head_element, dlist_init(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::file, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, MemoryContextAllocZero(), ReorderBufferTXN::nentries, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, rbtxn_is_serialized, ReorderBufferIterCompare(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferIterTXNEntry::segno, ReorderBufferTXN::subtxns, ReorderBufferTXNByIdEnt::txn, ReorderBufferIterTXNEntry::txn, and TXNEntryFile::vfd.

Referenced by ReorderBufferProcessTXN().

1178 {
1179  Size nr_txns = 0;
1181  dlist_iter cur_txn_i;
1182  int32 off;
1183 
1184  *iter_state = NULL;
1185 
1186  /* Check ordering of changes in the toplevel transaction. */
1187  AssertChangeLsnOrder(txn);
1188 
1189  /*
1190  * Calculate the size of our heap: one element for every transaction that
1191  * contains changes. (Besides the transactions already in the reorder
1192  * buffer, we count the one we were directly passed.)
1193  */
1194  if (txn->nentries > 0)
1195  nr_txns++;
1196 
1197  dlist_foreach(cur_txn_i, &txn->subtxns)
1198  {
1199  ReorderBufferTXN *cur_txn;
1200 
1201  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1202 
1203  /* Check ordering of changes in this subtransaction. */
1204  AssertChangeLsnOrder(cur_txn);
1205 
1206  if (cur_txn->nentries > 0)
1207  nr_txns++;
1208  }
1209 
1210  /* allocate iteration state */
1211  state = (ReorderBufferIterTXNState *)
1213  sizeof(ReorderBufferIterTXNState) +
1214  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1215 
1216  state->nr_txns = nr_txns;
1217  dlist_init(&state->old_change);
1218 
1219  for (off = 0; off < state->nr_txns; off++)
1220  {
1221  state->entries[off].file.vfd = -1;
1222  state->entries[off].segno = 0;
1223  }
1224 
1225  /* allocate heap */
1226  state->heap = binaryheap_allocate(state->nr_txns,
1228  state);
1229 
1230  /* Now that the state fields are initialized, it is safe to return it. */
1231  *iter_state = state;
1232 
1233  /*
1234  * Now insert items into the binary heap, in an unordered fashion. (We
1235  * will run a heap assembly step at the end; this is more efficient.)
1236  */
1237 
1238  off = 0;
1239 
1240  /* add toplevel transaction if it contains changes */
1241  if (txn->nentries > 0)
1242  {
1243  ReorderBufferChange *cur_change;
1244 
1245  if (rbtxn_is_serialized(txn))
1246  {
1247  /* serialize remaining changes */
1248  ReorderBufferSerializeTXN(rb, txn);
1249  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1250  &state->entries[off].segno);
1251  }
1252 
1253  cur_change = dlist_head_element(ReorderBufferChange, node,
1254  &txn->changes);
1255 
1256  state->entries[off].lsn = cur_change->lsn;
1257  state->entries[off].change = cur_change;
1258  state->entries[off].txn = txn;
1259 
1261  }
1262 
1263  /* add subtransactions if they contain changes */
1264  dlist_foreach(cur_txn_i, &txn->subtxns)
1265  {
1266  ReorderBufferTXN *cur_txn;
1267 
1268  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1269 
1270  if (cur_txn->nentries > 0)
1271  {
1272  ReorderBufferChange *cur_change;
1273 
1274  if (rbtxn_is_serialized(cur_txn))
1275  {
1276  /* serialize remaining changes */
1277  ReorderBufferSerializeTXN(rb, cur_txn);
1278  ReorderBufferRestoreChanges(rb, cur_txn,
1279  &state->entries[off].file,
1280  &state->entries[off].segno);
1281  }
1282  cur_change = dlist_head_element(ReorderBufferChange, node,
1283  &cur_txn->changes);
1284 
1285  state->entries[off].lsn = cur_change->lsn;
1286  state->entries[off].change = cur_change;
1287  state->entries[off].txn = cur_txn;
1288 
1290  }
1291  }
1292 
1293  /* assemble a valid binary heap */
1294  binaryheap_build(state->heap);
1295 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
ReorderBufferTXN * txn
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
void binaryheap_add_unordered(binaryheap *heap, Datum d)
Definition: binaryheap.c:110
signed int int32
Definition: c.h:417
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
ReorderBufferChange * change
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
MemoryContext context
dlist_node * cur
Definition: ilist.h:161
static void AssertChangeLsnOrder(ReorderBufferTXN *txn)
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:840
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:126
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
Definition: regguts.h:298
size_t Size
Definition: c.h:528
dlist_head subtxns
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:33
#define Int32GetDatum(X)
Definition: postgres.h:479
#define rbtxn_is_serialized(txn)

◆ ReorderBufferIterTXNNext()

static ReorderBufferChange * ReorderBufferIterTXNNext ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1304 of file reorderbuffer.c.

References Assert, binaryheap::bh_size, binaryheap_first(), binaryheap_remove_first(), binaryheap_replace_first(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, DatumGetInt32, DEBUG2, dlist_container, dlist_delete(), dlist_has_next(), dlist_head_element, dlist_is_empty(), dlist_next_node(), dlist_pop_head_node(), dlist_push_tail(), elog, ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::file, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, next, ReorderBufferChange::node, ReorderBufferIterTXNState::old_change, ReorderBufferRestoreChanges(), ReorderBufferReturnChange(), ReorderBufferIterTXNEntry::segno, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferProcessTXN().

1305 {
1306  ReorderBufferChange *change;
1308  int32 off;
1309 
1310  /* nothing there anymore */
1311  if (state->heap->bh_size == 0)
1312  return NULL;
1313 
1314  off = DatumGetInt32(binaryheap_first(state->heap));
1315  entry = &state->entries[off];
1316 
1317  /* free memory we might have "leaked" in the previous *Next call */
1318  if (!dlist_is_empty(&state->old_change))
1319  {
1320  change = dlist_container(ReorderBufferChange, node,
1321  dlist_pop_head_node(&state->old_change));
1322  ReorderBufferReturnChange(rb, change, true);
1323  Assert(dlist_is_empty(&state->old_change));
1324  }
1325 
1326  change = entry->change;
1327 
1328  /*
1329  * update heap with information about which transaction has the next
1330  * relevant change in LSN order
1331  */
1332 
1333  /* there are in-memory changes */
1334  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1335  {
1336  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1337  ReorderBufferChange *next_change =
1338  dlist_container(ReorderBufferChange, node, next);
1339 
1340  /* txn stays the same */
1341  state->entries[off].lsn = next_change->lsn;
1342  state->entries[off].change = next_change;
1343 
1345  return change;
1346  }
1347 
1348  /* try to load changes from disk */
1349  if (entry->txn->nentries != entry->txn->nentries_mem)
1350  {
1351  /*
1352  * Ugly: restoring changes will reuse *Change records, thus delete the
1353  * current one from the per-tx list and only free in the next call.
1354  */
1355  dlist_delete(&change->node);
1356  dlist_push_tail(&state->old_change, &change->node);
1357 
1358  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1359  &state->entries[off].segno))
1360  {
1361  /* successfully restored changes from disk */
1362  ReorderBufferChange *next_change =
1364  &entry->txn->changes);
1365 
1366  elog(DEBUG2, "restored %u/%u changes from disk",
1367  (uint32) entry->txn->nentries_mem,
1368  (uint32) entry->txn->nentries);
1369 
1370  Assert(entry->txn->nentries_mem);
1371  /* txn stays the same */
1372  state->entries[off].lsn = next_change->lsn;
1373  state->entries[off].change = next_change;
1375 
1376  return change;
1377  }
1378  }
1379 
1380  /* ok, no changes there anymore, remove */
1381  binaryheap_remove_first(state->heap);
1382 
1383  return change;
1384 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
static int32 next
Definition: blutils.c:219
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
#define DatumGetInt32(X)
Definition: postgres.h:472
ReorderBufferTXN * txn
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void binaryheap_replace_first(binaryheap *heap, Datum d)
Definition: binaryheap.c:204
signed int int32
Definition: c.h:417
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:421
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
Datum binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:159
#define DEBUG2
Definition: elog.h:24
unsigned int uint32
Definition: c.h:429
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
int bh_size
Definition: binaryheap.h:32
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
ReorderBufferChange * change
static bool dlist_has_next(dlist_head *head, dlist_node *node)
Definition: ilist.h:402
#define Assert(condition)
Definition: c.h:800
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define Int32GetDatum(X)
Definition: postgres.h:479
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
#define elog(elevel,...)
Definition: elog.h:228
Datum binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:174
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)

◆ ReorderBufferLargestTopTXN()

static ReorderBufferTXN* ReorderBufferLargestTopTXN ( ReorderBuffer rb)
static

Definition at line 3047 of file reorderbuffer.c.

References dlist_iter::cur, dlist_container, dlist_foreach, rbtxn_has_incomplete_tuple, ReorderBuffer::toplevel_by_lsn, ReorderBufferTXN::total_size, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferCheckMemoryLimit().

3048 {
3049  dlist_iter iter;
3050  Size largest_size = 0;
3051  ReorderBufferTXN *largest = NULL;
3052 
3053  /* Find the largest top-level transaction. */
3054  dlist_foreach(iter, &rb->toplevel_by_lsn)
3055  {
3056  ReorderBufferTXN *txn;
3057 
3058  txn = dlist_container(ReorderBufferTXN, node, iter.cur);
3059 
3060  if ((largest != NULL || txn->total_size > largest_size) &&
3061  (txn->total_size > 0) && !(rbtxn_has_incomplete_tuple(txn)))
3062  {
3063  largest = txn;
3064  largest_size = txn->total_size;
3065  }
3066  }
3067 
3068  return largest;
3069 }
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
#define rbtxn_has_incomplete_tuple(txn)
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head toplevel_by_lsn
dlist_node * cur
Definition: ilist.h:161
size_t Size
Definition: c.h:528

◆ ReorderBufferLargestTXN()

static ReorderBufferTXN* ReorderBufferLargestTXN ( ReorderBuffer rb)
static

Definition at line 3003 of file reorderbuffer.c.

References Assert, ReorderBuffer::by_txn, hash_seq_init(), hash_seq_search(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferCheckMemoryLimit().

3004 {
3005  HASH_SEQ_STATUS hash_seq;
3007  ReorderBufferTXN *largest = NULL;
3008 
3009  hash_seq_init(&hash_seq, rb->by_txn);
3010  while ((ent = hash_seq_search(&hash_seq)) != NULL)
3011  {
3012  ReorderBufferTXN *txn = ent->txn;
3013 
3014  /* if the current transaction is larger, remember it */
3015  if ((!largest) || (txn->size > largest->size))
3016  largest = txn;
3017  }
3018 
3019  Assert(largest);
3020  Assert(largest->size > 0);
3021  Assert(largest->size <= rb->size);
3022 
3023  return largest;
3024 }
#define Assert(condition)
Definition: c.h:800
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1401
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1391
ReorderBufferTXN * txn

◆ ReorderBufferProcessPartialChange()

static void ReorderBufferProcessPartialChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  toast_insert 
)
static

Definition at line 679 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, IsInsertOrUpdate, IsSpecConfirm, IsSpecInsert, rbtxn_has_incomplete_tuple, RBTXN_HAS_SPEC_INSERT, rbtxn_has_spec_insert, RBTXN_HAS_TOAST_INSERT, rbtxn_has_toast_insert, rbtxn_is_serialized, ReorderBufferCanStartStreaming(), ReorderBufferCanStream(), ReorderBufferStreamTXN(), ReorderBufferTXN::toptxn, ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::txn_flags.

Referenced by ReorderBufferQueueChange().

682 {
683  ReorderBufferTXN *toptxn;
684 
685  /*
686  * The partial changes need to be processed only while streaming
687  * in-progress transactions.
688  */
689  if (!ReorderBufferCanStream(rb))
690  return;
691 
692  /* Get the top transaction. */
693  if (txn->toptxn != NULL)
694  toptxn = txn->toptxn;
695  else
696  toptxn = txn;
697 
698  /*
699  * Set the toast insert bit whenever we get toast insert to indicate a
700  * partial change and clear it when we get the insert or update on main
701  * table (Both update and insert will do the insert in the toast table).
702  */
703  if (toast_insert)
705  else if (rbtxn_has_toast_insert(toptxn) &&
706  IsInsertOrUpdate(change->action))
707  toptxn->txn_flags &= ~RBTXN_HAS_TOAST_INSERT;
708 
709  /*
710  * Set the spec insert bit whenever we get the speculative insert to
711  * indicate the partial change and clear the same on speculative confirm.
712  */
713  if (IsSpecInsert(change->action))
714  toptxn->txn_flags |= RBTXN_HAS_SPEC_INSERT;
715  else if (IsSpecConfirm(change->action))
716  {
717  /*
718  * Speculative confirm change must be preceded by speculative
719  * insertion.
720  */
721  Assert(rbtxn_has_spec_insert(toptxn));
722  toptxn->txn_flags &= ~RBTXN_HAS_SPEC_INSERT;
723  }
724 
725  /*
726  * Stream the transaction if it is serialized before and the changes are
727  * now complete in the top-level transaction.
728  *
729  * The reason for doing the streaming of such a transaction as soon as we
730  * get the complete change for it is that previously it would have reached
731  * the memory threshold and wouldn't get streamed because of incomplete
732  * changes. Delaying such transactions would increase apply lag for them.
733  */
735  !(rbtxn_has_incomplete_tuple(toptxn)) &&
736  rbtxn_is_serialized(txn))
737  ReorderBufferStreamTXN(rb, toptxn);
738 }
#define RBTXN_HAS_TOAST_INSERT
#define rbtxn_has_incomplete_tuple(txn)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define rbtxn_has_toast_insert(txn)
#define IsSpecInsert(action)
static bool ReorderBufferCanStream(ReorderBuffer *rb)
struct ReorderBufferTXN * toptxn
#define rbtxn_has_spec_insert(txn)
#define Assert(condition)
Definition: c.h:800
#define IsSpecConfirm(action)
#define IsInsertOrUpdate(action)
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)
#define RBTXN_HAS_SPEC_INSERT
#define rbtxn_is_serialized(txn)

◆ ReorderBufferProcessTXN()

static void ReorderBufferProcessTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn,
volatile Snapshot  snapshot_now,
volatile CommandId  command_id,
bool  streaming 
)
static

Definition at line 1935 of file reorderbuffer.c.

References AbortCurrentTransaction(), ReorderBufferChange::action, Assert, ReorderBuffer::begin, BeginInternalSubTransaction(), CheckXidAlive, ReorderBufferChange::command_id, ReorderBuffer::commit, ReorderBufferTXN::concurrent_abort, SnapshotData::copied, CopyErrorData(), SnapshotData::curcid, CurrentMemoryContext, ReorderBufferChange::data, dlist_delete(), elog, ERROR, FlushErrorState(), FreeErrorData(), GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), i, ReorderBufferChange::inval, ReorderBufferTXN::invalidations, InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, ReorderBufferChange::node, ReorderBufferChange::origin_id, ReorderBufferTXN::origin_id, ReorderBuffer::output_rewrites, palloc0(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelationIsValid, RelidByRelfilenode(), relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferApplyChange(), ReorderBufferApplyMessage(), ReorderBufferApplyTruncate(), ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferResetTXN(), ReorderBufferReturnChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), RollbackAndReleaseCurrentSubTransaction(), SetupCheckXidLive(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, ErrorData::sqlerrcode, StartTransactionCommand(), ReorderBuffer::stream_start, ReorderBuffer::stream_stop, TeardownHistoricSnapshot(), ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTXN::tuplecid_hash, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferCommit(), and ReorderBufferStreamTXN().

1940 {
1941  bool using_subtxn;
1943  ReorderBufferIterTXNState *volatile iterstate = NULL;
1944  volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
1945  ReorderBufferChange *volatile specinsert = NULL;
1946  volatile bool stream_started = false;
1947  ReorderBufferTXN *volatile curtxn = NULL;
1948 
1949  /* build data to be able to lookup the CommandIds of catalog tuples */
1951 
1952  /* setup the initial snapshot */
1953  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1954 
1955  /*
1956  * Decoding needs access to syscaches et al., which in turn use
1957  * heavyweight locks and such. Thus we need to have enough state around to
1958  * keep track of those. The easiest way is to simply use a transaction
1959  * internally. That also allows us to easily enforce that nothing writes
1960  * to the database by checking for xid assignments.
1961  *
1962  * When we're called via the SQL SRF there's already a transaction
1963  * started, so start an explicit subtransaction there.
1964  */
1965  using_subtxn = IsTransactionOrTransactionBlock();
1966 
1967  PG_TRY();
1968  {
1969  ReorderBufferChange *change;
1970 
1971  if (using_subtxn)
1972  BeginInternalSubTransaction(streaming ? "stream" : "replay");
1973  else
1975 
1976  /* We only need to send begin/commit for non-streamed transactions. */
1977  if (!streaming)
1978  rb->begin(rb, txn);
1979 
1980  ReorderBufferIterTXNInit(rb, txn, &iterstate);
1981  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1982  {
1983  Relation relation = NULL;
1984  Oid reloid;
1985 
1986  /*
1987  * We can't call start stream callback before processing first
1988  * change.
1989  */
1990  if (prev_lsn == InvalidXLogRecPtr)
1991  {
1992  if (streaming)
1993  {
1994  txn->origin_id = change->origin_id;
1995  rb->stream_start(rb, txn, change->lsn);
1996  stream_started = true;
1997  }
1998  }
1999 
2000  /*
2001  * Enforce correct ordering of changes, merged from multiple
2002  * subtransactions. The changes may have the same LSN due to
2003  * MULTI_INSERT xlog records.
2004  */
2005  Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
2006 
2007  prev_lsn = change->lsn;
2008 
2009  /* Set the current xid to detect concurrent aborts. */
2010  if (streaming)
2011  {
2012  curtxn = change->txn;
2013  SetupCheckXidLive(curtxn->xid);
2014  }
2015 
2016  switch (change->action)
2017  {
2019 
2020  /*
2021  * Confirmation for speculative insertion arrived. Simply
2022  * use as a normal record. It'll be cleaned up at the end
2023  * of INSERT processing.
2024  */
2025  if (specinsert == NULL)
2026  elog(ERROR, "invalid ordering of speculative insertion changes");
2027  Assert(specinsert->data.tp.oldtuple == NULL);
2028  change = specinsert;
2030 
2031  /* intentionally fall through */
2035  Assert(snapshot_now);
2036 
2037  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
2038  change->data.tp.relnode.relNode);
2039 
2040  /*
2041  * Mapped catalog tuple without data, emitted while
2042  * catalog table was in the process of being rewritten. We
2043  * can fail to look up the relfilenode, because the
2044  * relmapper has no "historic" view, in contrast to normal
2045  * the normal catalog during decoding. Thus repeated
2046  * rewrites can cause a lookup failure. That's OK because
2047  * we do not decode catalog changes anyway. Normally such
2048  * tuples would be skipped over below, but we can't
2049  * identify whether the table should be logically logged
2050  * without mapping the relfilenode to the oid.
2051  */
2052  if (reloid == InvalidOid &&
2053  change->data.tp.newtuple == NULL &&
2054  change->data.tp.oldtuple == NULL)
2055  goto change_done;
2056  else if (reloid == InvalidOid)
2057  elog(ERROR, "could not map filenode \"%s\" to relation OID",
2058  relpathperm(change->data.tp.relnode,
2059  MAIN_FORKNUM));
2060 
2061  relation = RelationIdGetRelation(reloid);
2062 
2063  if (!RelationIsValid(relation))
2064  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
2065  reloid,
2066  relpathperm(change->data.tp.relnode,
2067  MAIN_FORKNUM));
2068 
2069  if (!RelationIsLogicallyLogged(relation))
2070  goto change_done;
2071 
2072  /*
2073  * Ignore temporary heaps created during DDL unless the
2074  * plugin has asked for them.
2075  */
2076  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2077  goto change_done;
2078 
2079  /*
2080  * For now ignore sequence changes entirely. Most of the
2081  * time they don't log changes using records we
2082  * understand, so it doesn't make sense to handle the few
2083  * cases we do.
2084  */
2085  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2086  goto change_done;
2087 
2088  /* user-triggered change */
2089  if (!IsToastRelation(relation))
2090  {
2091  ReorderBufferToastReplace(rb, txn, relation, change);
2092  ReorderBufferApplyChange(rb, txn, relation, change,
2093  streaming);
2094 
2095  /*
2096  * Only clear reassembled toast chunks if we're sure
2097  * they're not required anymore. The creator of the
2098  * tuple tells us.
2099  */
2100  if (change->data.tp.clear_toast_afterwards)
2101  ReorderBufferToastReset(rb, txn);
2102  }
2103  /* we're not interested in toast deletions */
2104  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2105  {
2106  /*
2107  * Need to reassemble the full toasted Datum in
2108  * memory, to ensure the chunks don't get reused till
2109  * we're done remove it from the list of this
2110  * transaction's changes. Otherwise it will get
2111  * freed/reused while restoring spooled data from
2112  * disk.
2113  */
2114  Assert(change->data.tp.newtuple != NULL);
2115 
2116  dlist_delete(&change->node);
2117  ReorderBufferToastAppendChunk(rb, txn, relation,
2118  change);
2119  }
2120 
2121  change_done:
2122 
2123  /*
2124  * Either speculative insertion was confirmed, or it was
2125  * unsuccessful and the record isn't needed anymore.
2126  */
2127  if (specinsert != NULL)
2128  {
2129  ReorderBufferReturnChange(rb, specinsert, true);
2130  specinsert = NULL;
2131  }
2132 
2133  if (RelationIsValid(relation))
2134  {
2135  RelationClose(relation);
2136  relation = NULL;
2137  }
2138  break;
2139 
2141 
2142  /*
2143  * Speculative insertions are dealt with by delaying the
2144  * processing of the insert until the confirmation record
2145  * arrives. For that we simply unlink the record from the
2146  * chain, so it does not get freed/reused while restoring
2147  * spooled data from disk.
2148  *
2149  * This is safe in the face of concurrent catalog changes
2150  * because the relevant relation can't be changed between
2151  * speculative insertion and confirmation due to
2152  * CheckTableNotInUse() and locking.
2153  */
2154 
2155  /* clear out a pending (and thus failed) speculation */
2156  if (specinsert != NULL)
2157  {
2158  ReorderBufferReturnChange(rb, specinsert, true);
2159  specinsert = NULL;
2160  }
2161 
2162  /* and memorize the pending insertion */
2163  dlist_delete(&change->node);
2164  specinsert = change;
2165  break;
2166 
2168  {
2169  int i;
2170  int nrelids = change->data.truncate.nrelids;
2171  int nrelations = 0;
2172  Relation *relations;
2173 
2174  relations = palloc0(nrelids * sizeof(Relation));
2175  for (i = 0; i < nrelids; i++)
2176  {
2177  Oid relid = change->data.truncate.relids[i];
2178  Relation relation;
2179 
2180  relation = RelationIdGetRelation(relid);
2181 
2182  if (!RelationIsValid(relation))
2183  elog(ERROR, "could not open relation with OID %u", relid);
2184 
2185  if (!RelationIsLogicallyLogged(relation))
2186  continue;
2187 
2188  relations[nrelations++] = relation;
2189  }
2190 
2191  /* Apply the truncate. */
2192  ReorderBufferApplyTruncate(rb, txn, nrelations,
2193  relations, change,
2194  streaming);
2195 
2196  for (i = 0; i < nrelations; i++)
2197  RelationClose(relations[i]);
2198 
2199  break;
2200  }
2201 
2203  ReorderBufferApplyMessage(rb, txn, change, streaming);
2204  break;
2205 
2207  /* Execute the invalidation messages locally */
2209  change->data.inval.ninvalidations,
2210  change->data.inval.invalidations);
2211  break;
2212 
2214  /* get rid of the old */
2215  TeardownHistoricSnapshot(false);
2216 
2217  if (snapshot_now->copied)
2218  {
2219  ReorderBufferFreeSnap(rb, snapshot_now);
2220  snapshot_now =
2221  ReorderBufferCopySnap(rb, change->data.snapshot,
2222  txn, command_id);
2223  }
2224 
2225  /*
2226  * Restored from disk, need to be careful not to double
2227  * free. We could introduce refcounting for that, but for
2228  * now this seems infrequent enough not to care.
2229  */
2230  else if (change->data.snapshot->copied)
2231  {
2232  snapshot_now =
2233  ReorderBufferCopySnap(rb, change->data.snapshot,
2234  txn, command_id);
2235  }
2236  else
2237  {
2238  snapshot_now = change->data.snapshot;
2239  }
2240 
2241  /* and continue with the new one */
2242  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2243  break;
2244 
2246  Assert(change->data.command_id != InvalidCommandId);
2247 
2248  if (command_id < change->data.command_id)
2249  {
2250  command_id = change->data.command_id;
2251 
2252  if (!snapshot_now->copied)
2253  {
2254  /* we don't use the global one anymore */
2255  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2256  txn, command_id);
2257  }
2258 
2259  snapshot_now->curcid = command_id;
2260 
2261  TeardownHistoricSnapshot(false);
2262  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2263  }
2264 
2265  break;
2266 
2268  elog(ERROR, "tuplecid value in changequeue");
2269  break;
2270  }
2271  }
2272 
2273  /*
2274  * There's a speculative insertion remaining, just clean in up, it
2275  * can't have been successful, otherwise we'd gotten a confirmation
2276  * record.
2277  */
2278  if (specinsert)
2279  {
2280  ReorderBufferReturnChange(rb, specinsert, true);
2281  specinsert = NULL;
2282  }
2283 
2284  /* clean up the iterator */
2285  ReorderBufferIterTXNFinish(rb, iterstate);
2286  iterstate = NULL;
2287 
2288  /*
2289  * Done with current changes, send the last message for this set of
2290  * changes depending upon streaming mode.
2291  */
2292  if (streaming)
2293  {
2294  if (stream_started)
2295  {
2296  rb->stream_stop(rb, txn, prev_lsn);
2297  stream_started = false;
2298  }
2299  }
2300  else
2301  rb->commit(rb, txn, commit_lsn);
2302 
2303  /* this is just a sanity check against bad output plugin behaviour */
2305  elog(ERROR, "output plugin used XID %u",
2307 
2308  /*
2309  * Remember the command ID and snapshot for the next set of changes in
2310  * streaming mode.
2311  */
2312  if (streaming)
2313  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2314  else if (snapshot_now->copied)
2315  ReorderBufferFreeSnap(rb, snapshot_now);
2316 
2317  /* cleanup */
2318  TeardownHistoricSnapshot(false);
2319 
2320  /*
2321  * Aborting the current (sub-)transaction as a whole has the right
2322  * semantics. We want all locks acquired in here to be released, not
2323  * reassigned to the parent and we do not want any database access
2324  * have persistent effects.
2325  */
2327 
2328  /* make sure there's no cache pollution */
2330 
2331  if (using_subtxn)
2333 
2334  /*
2335  * If we are streaming the in-progress transaction then discard the
2336  * changes that we just streamed, and mark the transactions as
2337  * streamed (if they contained changes). Otherwise, remove all the
2338  * changes and deallocate the ReorderBufferTXN.
2339  */
2340  if (streaming)
2341  {
2342  ReorderBufferTruncateTXN(rb, txn);
2343 
2344  /* Reset the CheckXidAlive */
2346  }
2347  else
2348  ReorderBufferCleanupTXN(rb, txn);
2349  }
2350  PG_CATCH();
2351  {
2352  MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
2353  ErrorData *errdata = CopyErrorData();
2354 
2355  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
2356  if (iterstate)
2357  ReorderBufferIterTXNFinish(rb, iterstate);
2358 
2360 
2361  /*
2362  * Force cache invalidation to happen outside of a valid transaction
2363  * to prevent catalog access as we just caught an error.
2364  */
2366 
2367  /* make sure there's no cache pollution */
2369  txn->invalidations);
2370 
2371  if (using_subtxn)
2373 
2374  /*
2375  * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2376  * abort of the (sub)transaction we are streaming. We need to do the
2377  * cleanup and return gracefully on this error, see SetupCheckXidLive.
2378  */
2379  if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK)
2380  {
2381  /*
2382  * This error can only occur when we are sending the data in
2383  * streaming mode and the streaming is not finished yet.
2384  */
2385  Assert(streaming);
2386  Assert(stream_started);
2387 
2388  /* Cleanup the temporary error state. */
2389  FlushErrorState();
2390  FreeErrorData(errdata);
2391  errdata = NULL;
2392  curtxn->concurrent_abort = true;
2393 
2394  /* Reset the TXN so that it is allowed to stream remaining data. */
2395  ReorderBufferResetTXN(rb, txn, snapshot_now,
2396  command_id, prev_lsn,
2397  specinsert);
2398  }
2399  else
2400  {
2401  ReorderBufferCleanupTXN(rb, txn);
2402  MemoryContextSwitchTo(ecxt);
2403  PG_RE_THROW();
2404  }
2405  }
2406  PG_END_TRY();
2407 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
void AbortCurrentTransaction(void)
Definition: xact.c:3212
bool IsToastRelation(Relation relation)
Definition: catalog.c:138
#define relpathperm(rnode, forknum)
Definition: relpath.h:83
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
bool copied
Definition: snapshot.h:185
int sqlerrcode
Definition: elog.h:378
ErrorData * CopyErrorData(void)
Definition: elog.c:1552
static void ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:87
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
union ReorderBufferChange::@98 data
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4703
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2047
ReorderBufferCommitCB commit
struct ReorderBufferChange::@98::@103 inval
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:636
Form_pg_class rd_rel
Definition: rel.h:110
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
void FlushErrorState(void)
Definition: elog.c:1646
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1608
#define ERROR
Definition: elog.h:43
#define RelationIsValid(relation)
Definition: rel.h:430
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:438
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4514
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:455
#define InvalidTransactionId
Definition: transam.h:31
void RelationClose(Relation relation)
Definition: relcache.c:2111
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
RepOriginId origin_id
Definition: reorderbuffer.h:89
TransactionId CheckXidAlive
Definition: xact.c:95
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
static void ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
void * palloc0(Size size)
Definition: mcxt.c:981
static void ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
#define InvalidCommandId
Definition: c.h:592
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define InvalidOid
Definition: postgres_ext.h:36
ReorderBufferStreamStartCB stream_start
#define PG_CATCH()
Definition: elog.h:319
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:800
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
void StartTransactionCommand(void)
Definition: xact.c:2847
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4409
SharedInvalidationMessage * invalidations
#define PG_RE_THROW()
Definition: elog.h:350
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2031
struct ReorderBufferChange::@98::@100 truncate
#define elog(elevel,...)
Definition: elog.h:228
struct ReorderBufferChange::@98::@99 tp
int i
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
static void SetupCheckXidLive(TransactionId xid)
ReorderBufferBeginCB begin
#define PG_TRY()
Definition: elog.h:309
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2005
#define PG_END_TRY()
Definition: elog.h:334
static void ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
ReorderBufferStreamStopCB stream_stop

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2650 of file reorderbuffer.c.

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by DecodeHeap2Op(), DecodeHeapOp(), DecodeLogicalMsgOp(), DecodeStandbyOp(), DecodeXactOp(), DecodeXLogOp(), and LogicalDecodingProcessRecord().

2651 {
2652  /* many records won't have an xid assigned, centralize check here */
2653  if (xid != InvalidTransactionId)
2654  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2655 }
#define InvalidTransactionId
Definition: transam.h:31
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change,
bool  toast_insert 
)

Definition at line 745 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, ReorderBufferTXN::concurrent_abort, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBufferChangeMemoryUpdate(), ReorderBufferCheckMemoryLimit(), ReorderBufferProcessPartialChange(), ReorderBufferReturnChange(), ReorderBufferTXNByXid(), ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddInvalidations(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

747 {
748  ReorderBufferTXN *txn;
749 
750  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
751 
752  /*
753  * While streaming the previous changes we have detected that the
754  * transaction is aborted. So there is no point in collecting further
755  * changes for it.
756  */
757  if (txn->concurrent_abort)
758  {
759  /*
760  * We don't need to update memory accounting for this change as we
761  * have not added it to the queue yet.
762  */
763  ReorderBufferReturnChange(rb, change, false);
764  return;
765  }
766 
767  change->lsn = lsn;
768  change->txn = txn;
769 
770  Assert(InvalidXLogRecPtr != lsn);
771  dlist_push_tail(&txn->changes, &change->node);
772  txn->nentries++;
773  txn->nentries_mem++;
774 
775  /* update memory accounting information */
776  ReorderBufferChangeMemoryUpdate(rb, change, true);
777 
778  /* process partial change */
779  ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
780 
781  /* check the memory limits and evict something if needed */
783 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:87
dlist_head changes
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
#define Assert(condition)
Definition: c.h:800
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snapshot,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 789 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), TeardownHistoricSnapshot(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeLogicalMsgOp().

793 {
794  if (transactional)
795  {
796  MemoryContext oldcontext;
797  ReorderBufferChange *change;
798 
800 
801  oldcontext = MemoryContextSwitchTo(rb->context);
802 
803  change = ReorderBufferGetChange(rb);
805  change->data.msg.prefix = pstrdup(prefix);
806  change->data.msg.message_size = message_size;
807  change->data.msg.message = palloc(message_size);
808  memcpy(change->data.msg.message, message, message_size);
809 
810  ReorderBufferQueueChange(rb, xid, lsn, change, false);
811 
812  MemoryContextSwitchTo(oldcontext);
813  }
814  else
815  {
816  ReorderBufferTXN *txn = NULL;
817  volatile Snapshot snapshot_now = snapshot;
818 
819  if (xid != InvalidTransactionId)
820  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
821 
822  /* setup snapshot to allow catalog access */
823  SetupHistoricSnapshot(snapshot_now, NULL);
824  PG_TRY();
825  {
826  rb->message(rb, txn, lsn, false, prefix, message_size, message);
827 
829  }
830  PG_CATCH();
831  {
833  PG_RE_THROW();
834  }
835  PG_END_TRY();
836  }
837 }
char * pstrdup(const char *in)
Definition: mcxt.c:1187
union ReorderBufferChange::@98 data
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2047
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
#define InvalidTransactionId
Definition: transam.h:31
ReorderBufferMessageCB message
MemoryContext context
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
#define PG_CATCH()
Definition: elog.h:319
#define Assert(condition)
Definition: c.h:800
#define PG_RE_THROW()
Definition: elog.h:350
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:950
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2031
#define PG_TRY()
Definition: elog.h:309
struct ReorderBufferChange::@98::@101 msg
#define PG_END_TRY()
Definition: elog.h:334

◆ ReorderBufferResetTXN()

static void ReorderBufferResetTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id,
XLogRecPtr  last_lsn,
ReorderBufferChange specinsert 
)
static

Definition at line 1896 of file reorderbuffer.c.

References ReorderBufferReturnChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), and ReorderBuffer::stream_stop.

Referenced by ReorderBufferProcessTXN().

1901 {
1902  /* Discard the changes that we just streamed */
1903  ReorderBufferTruncateTXN(rb, txn);
1904 
1905  /* Free all resources allocated for toast reconstruction */
1906  ReorderBufferToastReset(rb, txn);
1907 
1908  /* Return the spec insert change if it is not NULL */
1909  if (specinsert != NULL)
1910  {
1911  ReorderBufferReturnChange(rb, specinsert, true);
1912  specinsert = NULL;
1913  }
1914 
1915  /* Stop the stream. */
1916  rb->stream_stop(rb, txn, last_lsn);
1917 
1918  /* Remember the command ID and snapshot for the streaming run */
1919  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
1920 }
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
ReorderBufferStreamStopCB stream_stop

◆ ReorderBufferRestoreChange()

static void ReorderBufferRestoreChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
char *  change 
)
static

Definition at line 3824 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, ReorderBufferTXN::changes, ReorderBuffer::context, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::inval, MemoryContextAlloc(), MemoryContextAllocZero(), ReorderBufferChange::msg, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, offsetof, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferGetChange(), ReorderBufferGetRelids(), ReorderBufferGetTupleBuf(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, ReorderBufferChange::snapshot, SnapshotData::subxcnt, ReorderBufferChange::tp, ReorderBufferChange::truncate, SnapshotData::xcnt, and SnapshotData::xip.

Referenced by ReorderBufferRestoreChanges().

3826 {
3827  ReorderBufferDiskChange *ondisk;
3828  ReorderBufferChange *change;
3829 
3830  ondisk = (ReorderBufferDiskChange *) data;
3831 
3832  change = ReorderBufferGetChange(rb);
3833 
3834  /* copy static part */
3835  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
3836 
3837  data += sizeof(ReorderBufferDiskChange);
3838 
3839  /* restore individual stuff */
3840  switch (change->action)
3841  {
3842  /* fall through these, they're all similar enough */
3847  if (change->data.tp.oldtuple)
3848  {
3849  uint32 tuplelen = ((HeapTuple) data)->t_len;
3850 
3851  change->data.tp.oldtuple =
3853 
3854  /* restore ->tuple */
3855  memcpy(&change->data.tp.oldtuple->tuple, data,
3856  sizeof(HeapTupleData));
3857  data += sizeof(HeapTupleData);
3858 
3859  /* reset t_data pointer into the new tuplebuf */
3860  change->data.tp.oldtuple->tuple.t_data =
3861  ReorderBufferTupleBufData(change->data.tp.oldtuple);
3862 
3863  /* restore tuple data itself */
3864  memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
3865  data += tuplelen;
3866  }
3867 
3868  if (change->data.tp.newtuple)
3869  {
3870  /* here, data might not be suitably aligned! */
3871  uint32 tuplelen;
3872 
3873  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
3874  sizeof(uint32));
3875 
3876  change->data.tp.newtuple =
3878 
3879  /* restore ->tuple */
3880  memcpy(&change->data.tp.newtuple->tuple, data,
3881  sizeof(HeapTupleData));
3882  data += sizeof(HeapTupleData);
3883 
3884  /* reset t_data pointer into the new tuplebuf */
3885  change->data.tp.newtuple->tuple.t_data =
3886  ReorderBufferTupleBufData(change->data.tp.newtuple);
3887 
3888  /* restore tuple data itself */
3889  memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
3890  data += tuplelen;
3891  }
3892 
3893  break;
3895  {
3896  Size prefix_size;
3897 
3898  /* read prefix */
3899  memcpy(&prefix_size, data, sizeof(Size));
3900  data += sizeof(Size);
3901  change->data.msg.prefix = MemoryContextAlloc(rb->context,
3902  prefix_size);
3903  memcpy(change->data.msg.prefix, data, prefix_size);
3904  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
3905  data += prefix_size;
3906 
3907  /* read the message */
3908  memcpy(&change->data.msg.message_size, data, sizeof(Size));
3909  data += sizeof(Size);
3910  change->data.msg.message = MemoryContextAlloc(rb->context,
3911  change->data.msg.message_size);
3912  memcpy(change->data.msg.message, data,
3913  change->data.msg.message_size);
3914  data += change->data.msg.message_size;
3915 
3916  break;
3917  }
3919  {
3920  Size inval_size = sizeof(SharedInvalidationMessage) *
3921  change->data.inval.ninvalidations;
3922 
3923  change->data.inval.invalidations =
3924  MemoryContextAlloc(rb->context, inval_size);
3925 
3926  /* read the message */
3927  memcpy(change->data.inval.invalidations, data, inval_size);
3928 
3929  break;
3930  }
3932  {
3933  Snapshot oldsnap;
3934  Snapshot newsnap;
3935  Size size;
3936 
3937  oldsnap = (Snapshot) data;
3938 
3939  size = sizeof(SnapshotData) +
3940  sizeof(TransactionId) * oldsnap->xcnt +
3941  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
3942 
3943  change->data.snapshot = MemoryContextAllocZero(rb->context, size);
3944 
3945  newsnap = change->data.snapshot;
3946 
3947  memcpy(newsnap, data, size);
3948  newsnap->xip = (TransactionId *)
3949  (((char *) newsnap) + sizeof(SnapshotData));
3950  newsnap->subxip = newsnap->xip + newsnap->xcnt;
3951  newsnap->copied = true;
3952  break;
3953  }
3954  /* the base struct contains all the data, easy peasy */
3956  {
3957  Oid *relids;
3958 
3959  relids = ReorderBufferGetRelids(rb,
3960  change->data.truncate.nrelids);
3961  memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
3962  change->data.truncate.relids = relids;
3963 
3964  break;
3965  }
3969  break;
3970  }
3971 
3972  dlist_push_tail(&txn->changes, &change->node);
3973  txn->nentries_mem++;
3974 
3975  /*
3976  * Update memory accounting for the restored change. We need to do this
3977  * although we don't check the memory limit when restoring the changes in
3978  * this branch (we only do that when initially queueing the changes after
3979  * decoding), because we will release the changes later, and that will
3980  * update the accounting too (subtracting the size from the counters). And
3981  * we don't want to underflow there.
3982  */
3983  ReorderBufferChangeMemoryUpdate(rb, change, true);
3984 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
HeapTupleData * HeapTuple
Definition: htup.h:71
uint32 TransactionId
Definition: c.h:575
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
union ReorderBufferChange::@98 data
struct ReorderBufferChange::@98::@103 inval
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
struct SnapshotData * Snapshot
Definition: snapshot.h:121
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:38
dlist_head changes
struct SnapshotData SnapshotData
unsigned int uint32
Definition: c.h:429
TransactionId * xip
Definition: snapshot.h:168
MemoryContext context
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:840
struct ReorderBufferDiskChange ReorderBufferDiskChange
#define Assert(condition)
Definition: c.h:800
size_t Size
Definition: c.h:528
uint32 xcnt
Definition: snapshot.h:169
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797
struct ReorderBufferChange::@98::@100 truncate
Oid * ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
struct ReorderBufferChange::@98::@99 tp
ReorderBufferChange change
struct ReorderBufferChange::@98::@101 msg
struct HeapTupleData HeapTupleData
#define offsetof(type, field)
Definition: c.h:723
int32 subxcnt
Definition: snapshot.h:181
ReorderBufferTupleBuf * ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)

◆ ReorderBufferRestoreChanges()

static Size ReorderBufferRestoreChanges ( ReorderBuffer rb,
ReorderBufferTXN txn,
TXNEntryFile file,
XLogSegNo segno 
)
static

Definition at line 3683 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, cleanup(), dlist_mutable_iter::cur, TXNEntryFile::curOffset, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FileClose(), FileRead(), ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, max_changes_in_memory, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBuffer::outbuf, PathNameOpenFile(), PG_BINARY, ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferSerializedPath(), ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, TXNEntryFile::vfd, WAIT_EVENT_REORDER_BUFFER_READ, wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferIterTXNInit(), and ReorderBufferIterTXNNext().

3685 {
3686  Size restored = 0;
3687  XLogSegNo last_segno;
3688  dlist_mutable_iter cleanup_iter;
3689  File *fd = &file->vfd;
3690 
3693 
3694  /* free current entries, so we have memory for more */
3695  dlist_foreach_modify(cleanup_iter, &txn->changes)
3696  {
3698  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
3699 
3700  dlist_delete(&cleanup->node);
3701  ReorderBufferReturnChange(rb, cleanup, true);
3702  }
3703  txn->nentries_mem = 0;
3704  Assert(dlist_is_empty(&txn->changes));
3705 
3706  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
3707 
3708  while (restored < max_changes_in_memory && *segno <= last_segno)
3709  {
3710  int readBytes;
3711  ReorderBufferDiskChange *ondisk;
3712 
3713  if (*fd == -1)
3714  {
3715  char path[MAXPGPATH];
3716 
3717  /* first time in */
3718  if (*segno == 0)
3719  XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
3720 
3721  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
3722 
3723  /*
3724  * No need to care about TLIs here, only used during a single run,
3725  * so each LSN only maps to a specific WAL record.
3726  */
3728  *segno);
3729 
3730  *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
3731 
3732  /* No harm in resetting the offset even in case of failure */
3733  file->curOffset = 0;
3734 
3735  if (*fd < 0 && errno == ENOENT)
3736  {
3737  *fd = -1;
3738  (*segno)++;
3739  continue;
3740  }
3741  else if (*fd < 0)
3742  ereport(ERROR,
3744  errmsg("could not open file \"%s\": %m",
3745  path)));
3746  }
3747 
3748  /*
3749  * Read the statically sized part of a change which has information
3750  * about the total size. If we couldn't read a record, we're at the
3751  * end of this file.
3752  */
3754  readBytes = FileRead(file->vfd, rb->outbuf,
3755  sizeof(ReorderBufferDiskChange),
3757 
3758  /* eof */
3759  if (readBytes == 0)
3760  {
3761  FileClose(*fd);
3762  *fd = -1;
3763  (*segno)++;
3764  continue;
3765  }
3766  else if (readBytes < 0)
3767  ereport(ERROR,
3769  errmsg("could not read from reorderbuffer spill file: %m")));
3770  else if (readBytes != sizeof(ReorderBufferDiskChange))
3771  ereport(ERROR,
3773  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
3774  readBytes,
3775  (uint32) sizeof(ReorderBufferDiskChange))));
3776 
3777  file->curOffset += readBytes;
3778 
3779  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3780 
3782  sizeof(ReorderBufferDiskChange) + ondisk->size);
3783  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3784 
3785  readBytes = FileRead(file->vfd,
3786  rb->outbuf + sizeof(ReorderBufferDiskChange),
3787  ondisk->size - sizeof(ReorderBufferDiskChange),
3788  file->curOffset,
3790 
3791  if (readBytes < 0)
3792  ereport(ERROR,
3794  errmsg("could not read from reorderbuffer spill file: %m")));
3795  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
3796  ereport(ERROR,
3798  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
3799  readBytes,
3800  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
3801 
3802  file->curOffset += readBytes;
3803 
3804  /*
3805  * ok, read a full change from disk, now restore it into proper
3806  * in-memory format
3807  */
3808  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
3809  restored++;
3810  }
3811 
3812  return restored;
3813 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
dlist_node * cur
Definition: ilist.h:180
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1464
int wal_segment_size
Definition: xlog.c:117
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1267
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define ERROR
Definition: elog.h:43
dlist_head changes
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:41
int errcode_for_file_access(void)
Definition: elog.c:714
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
unsigned int uint32
Definition: c.h:429
XLogRecPtr final_lsn
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
static void cleanup(void)
Definition: bootstrap.c:862
TransactionId xid
#define ereport(elevel,...)
Definition: elog.h:155
struct ReorderBufferDiskChange ReorderBufferDiskChange
void FileClose(File file)
Definition: fd.c:1853
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:800
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
size_t Size
Definition: c.h:528
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
int errmsg(const char *fmt,...)
Definition: elog.c:902
static const Size max_changes_in_memory
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
int FileRead(File file, char *buffer, int amount, off_t offset, uint32 wait_event_info)
Definition: fd.c:2000
int File
Definition: fd.h:49
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ ReorderBufferRestoreCleanup()

static void ReorderBufferRestoreCleanup ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 3990 of file reorderbuffer.c.

References Assert, cur, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, MAXPGPATH, MyReplicationSlot, ReorderBufferSerializedPath(), wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferCleanupTXN(), and ReorderBufferTruncateTXN().

3991 {
3992  XLogSegNo first;
3993  XLogSegNo cur;
3994  XLogSegNo last;
3995 
3998 
3999  XLByteToSeg(txn->first_lsn, first, wal_segment_size);
4000  XLByteToSeg(txn->final_lsn, last, wal_segment_size);
4001 
4002  /* iterate over all possible filenames, and delete them */
4003  for (cur = first; cur <= last; cur++)
4004  {
4005  char path[MAXPGPATH];
4006 
4008  if (unlink(path) != 0 && errno != ENOENT)
4009  ereport(ERROR,
4011  errmsg("could not remove file \"%s\": %m", path)));
4012  }
4013 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int wal_segment_size
Definition: xlog.c:117
struct cursor * cur
Definition: ecpg.c:28
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:41
int errcode_for_file_access(void)
Definition: elog.c:714
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
XLogRecPtr final_lsn
TransactionId xid
#define ereport(elevel,...)
Definition: elog.h:155
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:800
int errmsg(const char *fmt,...)
Definition: elog.c:902
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ ReorderBufferReturnChange()

void ReorderBufferReturnChange ( ReorderBuffer rb,
ReorderBufferChange change,
bool  upd_mem 
)

Definition at line 459 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::msg, pfree(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferFreeSnap(), ReorderBufferReturnRelids(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, ReorderBufferChange::tp, and ReorderBufferChange::truncate.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferProcessTXN(), ReorderBufferQueueChange(), ReorderBufferResetTXN(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferToastReset(), and ReorderBufferTruncateTXN().

461 {
462  /* update memory accounting info */
463  if (upd_mem)
464  ReorderBufferChangeMemoryUpdate(rb, change, false);
465 
466  /* free contained data */
467  switch (change->action)
468  {
473  if (change->data.tp.newtuple)
474  {
475  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
476  change->data.tp.newtuple = NULL;
477  }
478 
479  if (change->data.tp.oldtuple)
480  {
481  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
482  change->data.tp.oldtuple = NULL;
483  }
484  break;
486  if (change->data.msg.prefix != NULL)
487  pfree(change->data.msg.prefix);
488  change->data.msg.prefix = NULL;
489  if (change->data.msg.message != NULL)
490  pfree(change->data.msg.message);
491  change->data.msg.message = NULL;
492  break;
494  if (change->data.inval.invalidations)
495  pfree(change->data.inval.invalidations);
496  change->data.inval.invalidations = NULL;
497  break;
499  if (change->data.snapshot)
500  {
501  ReorderBufferFreeSnap(rb, change->data.snapshot);
502  change->data.snapshot = NULL;
503  }
504  break;
505  /* no data in addition to the struct itself */
507  if (change->data.truncate.relids != NULL)
508  {
509  ReorderBufferReturnRelids(rb, change->data.truncate.relids);
510  change->data.truncate.relids = NULL;
511  }
512  break;
516  break;
517  }
518 
519  pfree(change);
520 }
union ReorderBufferChange::@98 data
struct ReorderBufferChange::@98::@103 inval
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
void pfree(void *pointer)
Definition: mcxt.c:1057
void ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
struct ReorderBufferChange::@98::@100 truncate
struct ReorderBufferChange::@98::@99 tp
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
struct ReorderBufferChange::@98::@101 msg

◆ ReorderBufferReturnRelids()

void ReorderBufferReturnRelids ( ReorderBuffer rb,
Oid relids 
)

Definition at line 579 of file reorderbuffer.c.

References pfree().

Referenced by ReorderBufferReturnChange().

580 {
581  pfree(relids);
582 }
void pfree(void *pointer)
Definition: mcxt.c:1057

◆ ReorderBufferReturnTupleBuf()

void ReorderBufferReturnTupleBuf ( ReorderBuffer rb,
ReorderBufferTupleBuf tuple 
)

Definition at line 548 of file reorderbuffer.c.

References pfree().

Referenced by ReorderBufferReturnChange().

549 {
550  pfree(tuple);
551 }
void pfree(void *pointer)
Definition: mcxt.c:1057

◆ ReorderBufferReturnTXN()

static void ReorderBufferReturnTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 414 of file reorderbuffer.c.

References ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, hash_destroy(), ReorderBufferTXN::invalidations, InvalidTransactionId, pfree(), ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::xid.

Referenced by ReorderBufferCleanupTXN().

415 {
416  /* clean the lookup cache if we were cached (quite likely) */
417  if (rb->by_txn_last_xid == txn->xid)
418  {
420  rb->by_txn_last_txn = NULL;
421  }
422 
423  /* free data that's contained */
424 
425  if (txn->tuplecid_hash != NULL)
426  {
428  txn->tuplecid_hash = NULL;
429  }
430 
431  if (txn->invalidations)
432  {
433  pfree(txn->invalidations);
434  txn->invalidations = NULL;
435  }
436 
437  pfree(txn);
438 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:827
TransactionId by_txn_last_xid
void pfree(void *pointer)
Definition: mcxt.c:1057
#define InvalidTransactionId
Definition: transam.h:31
ReorderBufferTXN * by_txn_last_txn
TransactionId xid
SharedInvalidationMessage * invalidations

◆ ReorderBufferSaveTXNSnapshot()

static void ReorderBufferSaveTXNSnapshot ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id 
)
inlinestatic

Definition at line 1877 of file reorderbuffer.c.

References ReorderBufferTXN::command_id, SnapshotData::copied, ReorderBufferCopySnap(), and ReorderBufferTXN::snapshot_now.

Referenced by ReorderBufferProcessTXN(), and ReorderBufferResetTXN().

1879 {
1880  txn->command_id = command_id;
1881 
1882  /* Avoid copying if it's already copied. */
1883  if (snapshot_now->copied)
1884  txn->snapshot_now = snapshot_now;
1885  else
1886  txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1887  txn, command_id);
1888 }
bool copied
Definition: snapshot.h:185
CommandId command_id
Snapshot snapshot_now
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)

◆ ReorderBufferSerializeChange()

static void ReorderBufferSerializeChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  fd,
ReorderBufferChange change 
)
static

Definition at line 3236 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, CloseTransientFile(), ReorderBufferChange::data, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferChange::inval, ReorderBufferChange::lsn, ReorderBufferChange::msg, ReorderBuffer::outbuf, pgstat_report_wait_end(), pgstat_report_wait_start(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTupleBuf::tuple, WAIT_EVENT_REORDER_BUFFER_WRITE, write, SnapshotData::xcnt, ReorderBufferTXN::xid, and SnapshotData::xip.

Referenced by ReorderBufferSerializeTXN().

3238 {
3239  ReorderBufferDiskChange *ondisk;
3240  Size sz = sizeof(ReorderBufferDiskChange);
3241 
3243 
3244  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3245  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
3246 
3247  switch (change->action)
3248  {
3249  /* fall through these, they're all similar enough */
3254  {
3255  char *data;
3256  ReorderBufferTupleBuf *oldtup,
3257  *newtup;
3258  Size oldlen = 0;
3259  Size newlen = 0;
3260 
3261  oldtup = change->data.tp.oldtuple;
3262  newtup = change->data.tp.newtuple;
3263 
3264  if (oldtup)
3265  {
3266  sz += sizeof(HeapTupleData);
3267  oldlen = oldtup->tuple.t_len;
3268  sz += oldlen;
3269  }
3270 
3271  if (newtup)
3272  {
3273  sz += sizeof(HeapTupleData);
3274  newlen = newtup->