PostgreSQL Source Code  git master
reorderbuffer.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/detoast.h"
#include "access/heapam.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/combocid.h"
#include "utils/memdebug.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenodemap.h"
Include dependency graph for reorderbuffer.c:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTXNByIdEnt
 
struct  ReorderBufferTupleCidKey
 
struct  ReorderBufferTupleCidEnt
 
struct  TXNEntryFile
 
struct  ReorderBufferIterTXNEntry
 
struct  ReorderBufferIterTXNState
 
struct  ReorderBufferToastEnt
 
struct  ReorderBufferDiskChange
 
struct  RewriteMappingFile
 

Macros

#define IsSpecInsert(action)
 
#define IsSpecConfirm(action)
 
#define IsInsertOrUpdate(action)
 

Typedefs

typedef struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
 
typedef struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
 
typedef struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
 
typedef struct TXNEntryFile TXNEntryFile
 
typedef struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
 
typedef struct ReorderBufferIterTXNState ReorderBufferIterTXNState
 
typedef struct ReorderBufferToastEnt ReorderBufferToastEnt
 
typedef struct ReorderBufferDiskChange ReorderBufferDiskChange
 
typedef struct RewriteMappingFile RewriteMappingFile
 

Functions

static ReorderBufferTXNReorderBufferGetTXN (ReorderBuffer *rb)
 
static void ReorderBufferReturnTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferTXNReorderBufferTXNByXid (ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
 
static void ReorderBufferTransferSnapToParent (ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
 
static void AssertTXNLsnOrder (ReorderBuffer *rb)
 
static void ReorderBufferIterTXNInit (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
 
static ReorderBufferChangeReorderBufferIterTXNNext (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferIterTXNFinish (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferExecuteInvalidations (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferCheckMemoryLimit (ReorderBuffer *rb)
 
static void ReorderBufferSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeChange (ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
 
static Size ReorderBufferRestoreChanges (ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
 
static void ReorderBufferRestoreChange (ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
 
static void ReorderBufferRestoreCleanup (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferTruncateTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferCleanupSerializedTXNs (const char *slotname)
 
static void ReorderBufferSerializedPath (char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
 
static void ReorderBufferFreeSnap (ReorderBuffer *rb, Snapshot snap)
 
static Snapshot ReorderBufferCopySnap (ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
 
static bool ReorderBufferCanStream (ReorderBuffer *rb)
 
static bool ReorderBufferCanStartStreaming (ReorderBuffer *rb)
 
static void ReorderBufferStreamTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferStreamCommit (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastInitHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReset (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReplace (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void ReorderBufferToastAppendChunk (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static Size ReorderBufferChangeSize (ReorderBufferChange *change)
 
static void ReorderBufferChangeMemoryUpdate (ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
 
ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *rb)
 
void ReorderBufferReturnChange (ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
 
ReorderBufferTupleBufReorderBufferGetTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
 
OidReorderBufferGetRelids (ReorderBuffer *rb, int nrelids)
 
void ReorderBufferReturnRelids (ReorderBuffer *rb, Oid *relids)
 
static void ReorderBufferProcessPartialChange (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
static void AssertChangeLsnOrder (ReorderBufferTXN *txn)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
TransactionId ReorderBufferGetOldestXmin (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
static int ReorderBufferIterCompare (Datum a, Datum b, void *arg)
 
static void ReorderBufferCleanupTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferBuildTupleCidHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void SetupCheckXidLive (TransactionId xid)
 
static void ReorderBufferApplyChange (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyTruncate (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyMessage (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferSaveTXNSnapshot (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
 
static void ReorderBufferResetTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
 
static void ReorderBufferProcessTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
static void ReorderBufferSerializeReserve (ReorderBuffer *rb, Size sz)
 
static ReorderBufferTXNReorderBufferLargestTXN (ReorderBuffer *rb)
 
static ReorderBufferTXNReorderBufferLargestTopTXN (ReorderBuffer *rb)
 
void StartupReorderBuffer (void)
 
static void ApplyLogicalMappingFile (HTAB *tuplecid_data, Oid relid, const char *fname)
 
static bool TransactionIdInArray (TransactionId xid, TransactionId *xip, Size num)
 
static int file_sort_by_lsn (const ListCell *a_p, const ListCell *b_p)
 
static void UpdateLogicalMappings (HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
 
bool ResolveCminCmaxDuringDecoding (HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
 

Variables

int logical_decoding_work_mem
 
static const Size max_changes_in_memory = 4096
 

Macro Definition Documentation

◆ IsInsertOrUpdate

◆ IsSpecConfirm

◆ IsSpecInsert

Typedef Documentation

◆ ReorderBufferDiskChange

◆ ReorderBufferIterTXNEntry

◆ ReorderBufferIterTXNState

◆ ReorderBufferToastEnt

◆ ReorderBufferTupleCidEnt

◆ ReorderBufferTupleCidKey

◆ ReorderBufferTXNByIdEnt

◆ RewriteMappingFile

◆ TXNEntryFile

typedef struct TXNEntryFile TXNEntryFile

Function Documentation

◆ ApplyLogicalMappingFile()

static void ApplyLogicalMappingFile ( HTAB tuplecid_data,
Oid  relid,
const char *  fname 
)
static

Definition at line 4395 of file reorderbuffer.c.

References Assert, CloseTransientFile(), ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), HASH_ENTER, HASH_FIND, hash_search(), InvalidCommandId, ItemPointerCopy, sort-test::key, MAXPGPATH, LogicalRewriteMappingData::new_node, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_node, LogicalRewriteMappingData::old_tid, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, ReorderBufferTupleCidKey::relnode, sprintf, ReorderBufferTupleCidKey::tid, and WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ.

Referenced by UpdateLogicalMappings().

4396 {
4397  char path[MAXPGPATH];
4398  int fd;
4399  int readBytes;
4401 
4402  sprintf(path, "pg_logical/mappings/%s", fname);
4403  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
4404  if (fd < 0)
4405  ereport(ERROR,
4407  errmsg("could not open file \"%s\": %m", path)));
4408 
4409  while (true)
4410  {
4413  ReorderBufferTupleCidEnt *new_ent;
4414  bool found;
4415 
4416  /* be careful about padding */
4417  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
4418 
4419  /* read all mappings till the end of the file */
4421  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
4423 
4424  if (readBytes < 0)
4425  ereport(ERROR,
4427  errmsg("could not read file \"%s\": %m",
4428  path)));
4429  else if (readBytes == 0) /* EOF */
4430  break;
4431  else if (readBytes != sizeof(LogicalRewriteMappingData))
4432  ereport(ERROR,
4434  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
4435  path, readBytes,
4436  (int32) sizeof(LogicalRewriteMappingData))));
4437 
4438  key.relnode = map.old_node;
4439  ItemPointerCopy(&map.old_tid,
4440  &key.tid);
4441 
4442 
4443  ent = (ReorderBufferTupleCidEnt *)
4444  hash_search(tuplecid_data,
4445  (void *) &key,
4446  HASH_FIND,
4447  NULL);
4448 
4449  /* no existing mapping, no need to update */
4450  if (!ent)
4451  continue;
4452 
4453  key.relnode = map.new_node;
4454  ItemPointerCopy(&map.new_tid,
4455  &key.tid);
4456 
4457  new_ent = (ReorderBufferTupleCidEnt *)
4458  hash_search(tuplecid_data,
4459  (void *) &key,
4460  HASH_ENTER,
4461  &found);
4462 
4463  if (found)
4464  {
4465  /*
4466  * Make sure the existing mapping makes sense. We sometime update
4467  * old records that did not yet have a cmax (e.g. pg_class' own
4468  * entry while rewriting it) during rewrites, so allow that.
4469  */
4470  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
4471  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
4472  }
4473  else
4474  {
4475  /* update mapping */
4476  new_ent->cmin = ent->cmin;
4477  new_ent->cmax = ent->cmax;
4478  new_ent->combocid = ent->combocid;
4479  }
4480  }
4481 
4482  if (CloseTransientFile(fd) != 0)
4483  ereport(ERROR,
4485  errmsg("could not close file \"%s\": %m", path)));
4486 }
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:919
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1211
signed int int32
Definition: c.h:362
#define sprintf
Definition: port.h:195
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2372
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:633
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1386
int CloseTransientFile(int fd)
Definition: fd.c:2549
#define InvalidCommandId
Definition: c.h:537
#define ereport(elevel,...)
Definition: elog.h:144
ItemPointerData new_tid
Definition: rewriteheap.h:40
#define Assert(condition)
Definition: c.h:745
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1362
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define read(a, b, c)
Definition: win32.h:13
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:161
ItemPointerData old_tid
Definition: rewriteheap.h:39

◆ AssertChangeLsnOrder()

static void AssertChangeLsnOrder ( ReorderBufferTXN txn)
static

Definition at line 892 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, and ReorderBufferChange::lsn.

Referenced by ReorderBufferIterTXNInit().

893 {
894 #ifdef USE_ASSERT_CHECKING
895  dlist_iter iter;
896  XLogRecPtr prev_lsn = txn->first_lsn;
897 
898  dlist_foreach(iter, &txn->changes)
899  {
900  ReorderBufferChange *cur_change;
901 
902  cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
903 
905  Assert(cur_change->lsn != InvalidXLogRecPtr);
906  Assert(txn->first_lsn <= cur_change->lsn);
907 
908  if (txn->end_lsn != InvalidXLogRecPtr)
909  Assert(cur_change->lsn <= txn->end_lsn);
910 
911  Assert(prev_lsn <= cur_change->lsn);
912 
913  prev_lsn = cur_change->lsn;
914  }
915 #endif
916 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
dlist_node * cur
Definition: ilist.h:161
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:745
XLogRecPtr end_lsn

◆ AssertTXNLsnOrder()

static void AssertTXNLsnOrder ( ReorderBuffer rb)
static

Definition at line 835 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferAssignChild(), ReorderBufferGetOldestTXN(), ReorderBufferGetOldestXmin(), ReorderBufferSetBaseSnapshot(), and ReorderBufferTXNByXid().

836 {
837 #ifdef USE_ASSERT_CHECKING
838  dlist_iter iter;
839  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
840  XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
841 
842  dlist_foreach(iter, &rb->toplevel_by_lsn)
843  {
845  iter.cur);
846 
847  /* start LSN must be set */
848  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
849 
850  /* If there is an end LSN, it must be higher than start LSN */
851  if (cur_txn->end_lsn != InvalidXLogRecPtr)
852  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
853 
854  /* Current initial LSN must be strictly higher than previous */
855  if (prev_first_lsn != InvalidXLogRecPtr)
856  Assert(prev_first_lsn < cur_txn->first_lsn);
857 
858  /* known-as-subtxn txns must not be listed */
859  Assert(!rbtxn_is_known_subxact(cur_txn));
860 
861  prev_first_lsn = cur_txn->first_lsn;
862  }
863 
865  {
867  base_snapshot_node,
868  iter.cur);
869 
870  /* base snapshot (and its LSN) must be set */
871  Assert(cur_txn->base_snapshot != NULL);
873 
874  /* current LSN must be strictly higher than previous */
875  if (prev_base_snap_lsn != InvalidXLogRecPtr)
876  Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
877 
878  /* known-as-subtxn txns must not be listed */
879  Assert(!rbtxn_is_known_subxact(cur_txn));
880 
881  prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
882  }
883 #endif
884 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
XLogRecPtr base_snapshot_lsn
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head txns_by_base_snapshot_lsn
dlist_head toplevel_by_lsn
dlist_node * cur
Definition: ilist.h:161
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:745
XLogRecPtr end_lsn
#define rbtxn_is_known_subxact(txn)

◆ file_sort_by_lsn()

static int file_sort_by_lsn ( const ListCell a_p,
const ListCell b_p 
)
static

Definition at line 4503 of file reorderbuffer.c.

References lfirst, and RewriteMappingFile::lsn.

Referenced by UpdateLogicalMappings().

4504 {
4507 
4508  if (a->lsn < b->lsn)
4509  return -1;
4510  else if (a->lsn > b->lsn)
4511  return 1;
4512  return 0;
4513 }
#define lfirst(lc)
Definition: pg_list.h:190

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2468 of file reorderbuffer.c.

References ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), ReorderBuffer::stream_abort, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeAbort().

2469 {
2470  ReorderBufferTXN *txn;
2471 
2472  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2473  false);
2474 
2475  /* unknown, nothing to remove */
2476  if (txn == NULL)
2477  return;
2478 
2479  /* For streamed transactions notify the remote node about the abort. */
2480  if (rbtxn_is_streamed(txn))
2481  {
2482  rb->stream_abort(rb, txn, lsn);
2483 
2484  /*
2485  * We might have decoded changes for this transaction that could load
2486  * the cache as per the current transaction's view (consider DDL's
2487  * happened in this transaction). We don't want the decoding of future
2488  * transactions to use those cache entries so execute invalidations.
2489  */
2490  if (txn->ninvalidations > 0)
2492  txn->invalidations);
2493  }
2494 
2495  /* cosmetic... */
2496  txn->final_lsn = lsn;
2497 
2498  /* remove potential on-disk data, and deallocate */
2499  ReorderBufferCleanupTXN(rb, txn);
2500 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
ReorderBufferStreamAbortCB stream_abort
#define rbtxn_is_streamed(txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)

Definition at line 2510 of file reorderbuffer.c.

References dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, elog, ReorderBufferCleanupTXN(), ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::xid.

Referenced by DecodeStandbyOp().

2511 {
2512  dlist_mutable_iter it;
2513 
2514  /*
2515  * Iterate through all (potential) toplevel TXNs and abort all that are
2516  * older than what possibly can be running. Once we've found the first
2517  * that is alive we stop, there might be some that acquired an xid earlier
2518  * but started writing later, but it's unlikely and they will be cleaned
2519  * up in a later call to this function.
2520  */
2522  {
2523  ReorderBufferTXN *txn;
2524 
2525  txn = dlist_container(ReorderBufferTXN, node, it.cur);
2526 
2527  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2528  {
2529  elog(DEBUG2, "aborting old transaction %u", txn->xid);
2530 
2531  /* remove potential on-disk data, and deallocate this tx */
2532  ReorderBufferCleanupTXN(rb, txn);
2533  }
2534  else
2535  return;
2536  }
2537 }
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define DEBUG2
Definition: elog.h:24
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
dlist_head toplevel_by_lsn
TransactionId xid
#define elog(elevel,...)
Definition: elog.h:214

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 2811 of file reorderbuffer.c.

References Assert, ReorderBuffer::context, ReorderBufferTXN::invalidations, MemoryContextAlloc(), ReorderBufferTXN::ninvalidations, ReorderBufferTXNByXid(), repalloc(), ReorderBufferTXN::toptxn, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeXactOp().

2814 {
2815  ReorderBufferTXN *txn;
2816 
2817  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2818 
2819  /*
2820  * We collect all the invalidations under the top transaction so that we
2821  * can execute them all together.
2822  */
2823  if (txn->toptxn)
2824  txn = txn->toptxn;
2825 
2826  Assert(nmsgs > 0);
2827 
2828  /* Accumulate invalidations. */
2829  if (txn->ninvalidations == 0)
2830  {
2831  txn->ninvalidations = nmsgs;
2834  sizeof(SharedInvalidationMessage) * nmsgs);
2835  memcpy(txn->invalidations, msgs,
2836  sizeof(SharedInvalidationMessage) * nmsgs);
2837  }
2838  else
2839  {
2842  (txn->ninvalidations + nmsgs));
2843 
2844  memcpy(txn->invalidations + txn->ninvalidations, msgs,
2845  nmsgs * sizeof(SharedInvalidationMessage));
2846  txn->ninvalidations += nmsgs;
2847  }
2848 }
struct ReorderBufferTXN * toptxn
MemoryContext context
#define Assert(condition)
Definition: c.h:745
SharedInvalidationMessage * invalidations
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1070
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

Definition at line 2691 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::command_id, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, ReorderBufferGetChange(), and ReorderBufferQueueChange().

Referenced by SnapBuildProcessNewCid().

2693 {
2695 
2696  change->data.command_id = cid;
2698 
2699  ReorderBufferQueueChange(rb, xid, lsn, change, false);
2700 }
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
union ReorderBufferChange::@99 data

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileNode  node,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 2778 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessNewCid().

2782 {
2784  ReorderBufferTXN *txn;
2785 
2786  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2787 
2788  change->data.tuplecid.node = node;
2789  change->data.tuplecid.tid = tid;
2790  change->data.tuplecid.cmin = cmin;
2791  change->data.tuplecid.cmax = cmax;
2792  change->data.tuplecid.combocid = combocid;
2793  change->lsn = lsn;
2794  change->txn = txn;
2796 
2797  dlist_push_tail(&txn->tuplecids, &change->node);
2798  txn->ntuplecids++;
2799 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:86
struct ReorderBufferChange::@99::@103 tuplecid
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
union ReorderBufferChange::@99 data
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
dlist_head tuplecids

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 2642 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, ReorderBufferGetChange(), ReorderBufferQueueChange(), and ReorderBufferChange::snapshot.

Referenced by SnapBuildDistributeNewCatalogSnapshot().

2644 {
2646 
2647  change->data.snapshot = snap;
2649 
2650  ReorderBufferQueueChange(rb, xid, lsn, change, false);
2651 }
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
union ReorderBufferChange::@99 data

◆ ReorderBufferAllocate()

ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 298 of file reorderbuffer.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, ReorderBufferCleanupSerializedTXNs(), ReorderBuffer::size, SLAB_DEFAULT_BLOCK_SIZE, SLAB_LARGE_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::toplevel_by_lsn, ReorderBuffer::tup_context, ReorderBuffer::txn_context, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

299 {
300  ReorderBuffer *buffer;
301  HASHCTL hash_ctl;
302  MemoryContext new_ctx;
303 
304  Assert(MyReplicationSlot != NULL);
305 
306  /* allocate memory in own context, to have better accountability */
308  "ReorderBuffer",
310 
311  buffer =
312  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
313 
314  memset(&hash_ctl, 0, sizeof(hash_ctl));
315 
316  buffer->context = new_ctx;
317 
318  buffer->change_context = SlabContextCreate(new_ctx,
319  "Change",
321  sizeof(ReorderBufferChange));
322 
323  buffer->txn_context = SlabContextCreate(new_ctx,
324  "TXN",
326  sizeof(ReorderBufferTXN));
327 
328  buffer->tup_context = GenerationContextCreate(new_ctx,
329  "Tuples",
331 
332  hash_ctl.keysize = sizeof(TransactionId);
333  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
334  hash_ctl.hcxt = buffer->context;
335 
336  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
338 
340  buffer->by_txn_last_txn = NULL;
341 
342  buffer->outbuf = NULL;
343  buffer->outbufsize = 0;
344  buffer->size = 0;
345 
347 
348  dlist_init(&buffer->toplevel_by_lsn);
350 
351  /*
352  * Ensure there's no stale data from prior uses of this slot, in case some
353  * prior exit avoided calling ReorderBufferFree. Failure to do this can
354  * produce duplicated txns, and it's very cheap if there's nothing there.
355  */
357 
358  return buffer;
359 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define AllocSetContextCreate
Definition: memutils.h:170
#define HASH_CONTEXT
Definition: hsearch.h:91
#define HASH_ELEM
Definition: hsearch.h:85
uint32 TransactionId
Definition: c.h:520
MemoryContext hcxt
Definition: hsearch.h:77
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
Size entrysize
Definition: hsearch.h:72
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:174
ReplicationSlotPersistentData data
Definition: slot.h:143
MemoryContext change_context
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:222
dlist_head txns_by_base_snapshot_lsn
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
#define InvalidTransactionId
Definition: transam.h:31
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define HASH_BLOBS
Definition: hsearch.h:86
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size blockSize)
Definition: generation.c:196
MemoryContext context
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:326
ReorderBufferTXN * by_txn_last_txn
dlist_head toplevel_by_lsn
Size keysize
Definition: hsearch.h:71
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:745
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:221
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797
#define NameStr(name)
Definition: c.h:622
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
MemoryContext tup_context
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext txn_context

◆ ReorderBufferApplyChange()

static void ReorderBufferApplyChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1809 of file reorderbuffer.c.

References ReorderBuffer::apply_change, and ReorderBuffer::stream_change.

Referenced by ReorderBufferProcessTXN().

1812 {
1813  if (streaming)
1814  rb->stream_change(rb, txn, relation, change);
1815  else
1816  rb->apply_change(rb, txn, relation, change);
1817 }
ReorderBufferApplyChangeCB apply_change
ReorderBufferStreamChangeCB stream_change

◆ ReorderBufferApplyMessage()

static void ReorderBufferApplyMessage ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1837 of file reorderbuffer.c.

References ReorderBufferChange::data, ReorderBufferChange::lsn, ReorderBuffer::message, ReorderBufferChange::msg, and ReorderBuffer::stream_message.

Referenced by ReorderBufferProcessTXN().

1839 {
1840  if (streaming)
1841  rb->stream_message(rb, txn, change->lsn, true,
1842  change->data.msg.prefix,
1843  change->data.msg.message_size,
1844  change->data.msg.message);
1845  else
1846  rb->message(rb, txn, change->lsn, true,
1847  change->data.msg.prefix,
1848  change->data.msg.message_size,
1849  change->data.msg.message);
1850 }
struct ReorderBufferChange::@99::@102 msg
ReorderBufferStreamMessageCB stream_message
ReorderBufferMessageCB message
union ReorderBufferChange::@99 data

◆ ReorderBufferApplyTruncate()

static void ReorderBufferApplyTruncate ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  nrelations,
Relation relations,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1823 of file reorderbuffer.c.

References ReorderBuffer::apply_truncate, and ReorderBuffer::stream_truncate.

Referenced by ReorderBufferProcessTXN().

1826 {
1827  if (streaming)
1828  rb->stream_truncate(rb, txn, nrelations, relations, change);
1829  else
1830  rb->apply_truncate(rb, txn, nrelations, relations, change);
1831 }
ReorderBufferStreamTruncateCB stream_truncate
ReorderBufferApplyTruncateCB apply_truncate

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 978 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SUBXACT, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, ReorderBufferTXN::toplevel_xid, ReorderBufferTXN::toptxn, ReorderBufferTXNByIdEnt::txn, ReorderBufferTXN::txn_flags, and ReorderBufferTXNByIdEnt::xid.

Referenced by LogicalDecodingProcessRecord(), and ReorderBufferCommitChild().

980 {
981  ReorderBufferTXN *txn;
982  ReorderBufferTXN *subtxn;
983  bool new_top;
984  bool new_sub;
985 
986  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
987  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
988 
989  if (!new_sub)
990  {
991  if (rbtxn_is_known_subxact(subtxn))
992  {
993  /* already associated, nothing to do */
994  return;
995  }
996  else
997  {
998  /*
999  * We already saw this transaction, but initially added it to the
1000  * list of top-level txns. Now that we know it's not top-level,
1001  * remove it from there.
1002  */
1003  dlist_delete(&subtxn->node);
1004  }
1005  }
1006 
1007  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1008  subtxn->toplevel_xid = xid;
1009  Assert(subtxn->nsubtxns == 0);
1010 
1011  /* set the reference to top-level transaction */
1012  subtxn->toptxn = txn;
1013 
1014  /* add to subtransaction list */
1015  dlist_push_tail(&txn->subtxns, &subtxn->node);
1016  txn->nsubtxns++;
1017 
1018  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1019  ReorderBufferTransferSnapToParent(txn, subtxn);
1020 
1021  /* Verify LSN-ordering invariant */
1022  AssertTXNLsnOrder(rb);
1023 }
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
#define RBTXN_IS_SUBXACT
struct ReorderBufferTXN * toptxn
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:745
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_known_subxact(txn)
TransactionId toplevel_xid

◆ ReorderBufferBuildTupleCidHash()

static void ReorderBufferBuildTupleCidHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1594 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, dlist_container, dlist_foreach, dlist_is_empty(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FIND, hash_search(), HASHCTL::hcxt, InvalidCommandId, ItemPointerCopy, sort-test::key, HASHCTL::keysize, ReorderBufferTXN::ntuplecids, rbtxn_has_catalog_changes, ReorderBufferTupleCidKey::relnode, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferTupleCidKey::tid, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::tuplecids.

Referenced by ReorderBufferProcessTXN().

1595 {
1596  dlist_iter iter;
1597  HASHCTL hash_ctl;
1598 
1600  return;
1601 
1602  memset(&hash_ctl, 0, sizeof(hash_ctl));
1603 
1604  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1605  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1606  hash_ctl.hcxt = rb->context;
1607 
1608  /*
1609  * create the hash with the exact number of to-be-stored tuplecids from
1610  * the start
1611  */
1612  txn->tuplecid_hash =
1613  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1615 
1616  dlist_foreach(iter, &txn->tuplecids)
1617  {
1620  bool found;
1621  ReorderBufferChange *change;
1622 
1623  change = dlist_container(ReorderBufferChange, node, iter.cur);
1624 
1626 
1627  /* be careful about padding */
1628  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1629 
1630  key.relnode = change->data.tuplecid.node;
1631 
1632  ItemPointerCopy(&change->data.tuplecid.tid,
1633  &key.tid);
1634 
1635  ent = (ReorderBufferTupleCidEnt *)
1637  (void *) &key,
1639  &found);
1640  if (!found)
1641  {
1642  ent->cmin = change->data.tuplecid.cmin;
1643  ent->cmax = change->data.tuplecid.cmax;
1644  ent->combocid = change->data.tuplecid.combocid;
1645  }
1646  else
1647  {
1648  /*
1649  * Maybe we already saw this tuple before in this transaction, but
1650  * if so it must have the same cmin.
1651  */
1652  Assert(ent->cmin == change->data.tuplecid.cmin);
1653 
1654  /*
1655  * cmax may be initially invalid, but once set it can only grow,
1656  * and never become invalid again.
1657  */
1658  Assert((ent->cmax == InvalidCommandId) ||
1659  ((change->data.tuplecid.cmax != InvalidCommandId) &&
1660  (change->data.tuplecid.cmax > ent->cmax)));
1661  ent->cmax = change->data.tuplecid.cmax;
1662  }
1663  }
1664 }
#define HASH_CONTEXT
Definition: hsearch.h:91
#define HASH_ELEM
Definition: hsearch.h:85
MemoryContext hcxt
Definition: hsearch.h:77
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
struct ReorderBufferChange::@99::@103 tuplecid
Size entrysize
Definition: hsearch.h:72
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:919
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
#define HASH_BLOBS
Definition: hsearch.h:86
MemoryContext context
#define InvalidCommandId
Definition: c.h:537
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:326
union ReorderBufferChange::@99 data
Size keysize
Definition: hsearch.h:71
dlist_node * cur
Definition: ilist.h:161
#define Assert(condition)
Definition: c.h:745
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define rbtxn_has_catalog_changes(txn)
dlist_head tuplecids
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:161

◆ ReorderBufferCanStartStreaming()

static bool ReorderBufferCanStartStreaming ( ReorderBuffer rb)
inlinestatic

Definition at line 3391 of file reorderbuffer.c.

References Assert, XLogReaderState::EndRecPtr, ReorderBuffer::private_data, LogicalDecodingContext::reader, ReorderBufferCanStream(), SNAPBUILD_CONSISTENT, SnapBuildCurrentState(), SnapBuildXactNeedsSkip(), and LogicalDecodingContext::snapshot_builder.

Referenced by ReorderBufferCheckMemoryLimit(), and ReorderBufferProcessPartialChange().

3392 {
3394  SnapBuild *builder = ctx->snapshot_builder;
3395 
3396  /*
3397  * We can't start streaming immediately even if the streaming is enabled
3398  * because we previously decoded this transaction and now just are
3399  * restarting.
3400  */
3401  if (ReorderBufferCanStream(rb) &&
3402  !SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
3403  {
3404  /* We must have a consistent snapshot by this time */
3406  return true;
3407  }
3408 
3409  return false;
3410 }
void * private_data
bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
Definition: snapbuild.c:404
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
Definition: snapbuild.c:395
XLogRecPtr EndRecPtr
Definition: xlogreader.h:176
static bool ReorderBufferCanStream(ReorderBuffer *rb)
struct SnapBuild * snapshot_builder
Definition: logical.h:43
#define Assert(condition)
Definition: c.h:745
XLogReaderState * reader
Definition: logical.h:41

◆ ReorderBufferCanStream()

static bool ReorderBufferCanStream ( ReorderBuffer rb)
inlinestatic

◆ ReorderBufferChangeMemoryUpdate()

static void ReorderBufferChangeMemoryUpdate ( ReorderBuffer rb,
ReorderBufferChange change,
bool  addition 
)
static

Definition at line 2716 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferCanStream(), ReorderBufferChangeSize(), ReorderBufferTXN::size, ReorderBuffer::size, ReorderBufferTXN::toptxn, ReorderBufferTXN::total_size, ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferReturnChange(), and ReorderBufferToastReplace().

2719 {
2720  Size sz;
2721  ReorderBufferTXN *txn;
2722  ReorderBufferTXN *toptxn = NULL;
2723 
2724  Assert(change->txn);
2725 
2726  /*
2727  * Ignore tuple CID changes, because those are not evicted when reaching
2728  * memory limit. So we just don't count them, because it might easily
2729  * trigger a pointless attempt to spill.
2730  */
2732  return;
2733 
2734  txn = change->txn;
2735 
2736  /* If streaming supported, update the total size in top level as well. */
2737  if (ReorderBufferCanStream(rb))
2738  {
2739  if (txn->toptxn != NULL)
2740  toptxn = txn->toptxn;
2741  else
2742  toptxn = txn;
2743  }
2744 
2745  sz = ReorderBufferChangeSize(change);
2746 
2747  if (addition)
2748  {
2749  txn->size += sz;
2750  rb->size += sz;
2751 
2752  /* Update the total size in the top transaction. */
2753  if (toptxn)
2754  toptxn->total_size += sz;
2755  }
2756  else
2757  {
2758  Assert((rb->size >= sz) && (txn->size >= sz));
2759  txn->size -= sz;
2760  rb->size -= sz;
2761 
2762  /* Update the total size in the top transaction. */
2763  if (toptxn)
2764  toptxn->total_size -= sz;
2765  }
2766 
2767  Assert(txn->size <= rb->size);
2768 }
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:86
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
static bool ReorderBufferCanStream(ReorderBuffer *rb)
struct ReorderBufferTXN * toptxn
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
#define Assert(condition)
Definition: c.h:745
size_t Size
Definition: c.h:473

◆ ReorderBufferChangeSize()

static Size ReorderBufferChangeSize ( ReorderBufferChange change)
static

Definition at line 3514 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::msg, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::snapshot, SnapshotData::subxcnt, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTupleBuf::tuple, and SnapshotData::xcnt.

Referenced by ReorderBufferChangeMemoryUpdate().

3515 {
3516  Size sz = sizeof(ReorderBufferChange);
3517 
3518  switch (change->action)
3519  {
3520  /* fall through these, they're all similar enough */
3525  {
3526  ReorderBufferTupleBuf *oldtup,
3527  *newtup;
3528  Size oldlen = 0;
3529  Size newlen = 0;
3530 
3531  oldtup = change->data.tp.oldtuple;
3532  newtup = change->data.tp.newtuple;
3533 
3534  if (oldtup)
3535  {
3536  sz += sizeof(HeapTupleData);
3537  oldlen = oldtup->tuple.t_len;
3538  sz += oldlen;
3539  }
3540 
3541  if (newtup)
3542  {
3543  sz += sizeof(HeapTupleData);
3544  newlen = newtup->tuple.t_len;
3545  sz += newlen;
3546  }
3547 
3548  break;
3549  }
3551  {
3552  Size prefix_size = strlen(change->data.msg.prefix) + 1;
3553 
3554  sz += prefix_size + change->data.msg.message_size +
3555  sizeof(Size) + sizeof(Size);
3556 
3557  break;
3558  }
3560  {
3561  Snapshot snap;
3562 
3563  snap = change->data.snapshot;
3564 
3565  sz += sizeof(SnapshotData) +
3566  sizeof(TransactionId) * snap->xcnt +
3567  sizeof(TransactionId) * snap->subxcnt;
3568 
3569  break;
3570  }
3572  {
3573  sz += sizeof(Oid) * change->data.truncate.nrelids;
3574 
3575  break;
3576  }
3580  /* ReorderBufferChange contains everything important */
3581  break;
3582  }
3583 
3584  return sz;
3585 }
struct ReorderBufferChange::@99::@101 truncate
uint32 TransactionId
Definition: c.h:520
struct ReorderBufferChange::@99::@102 msg
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
uint32 t_len
Definition: htup.h:64
HeapTupleData tuple
Definition: reorderbuffer.h:29
struct SnapshotData SnapshotData
union ReorderBufferChange::@99 data
struct ReorderBufferChange::@99::@100 tp
size_t Size
Definition: c.h:473
uint32 xcnt
Definition: snapshot.h:169
struct HeapTupleData HeapTupleData
struct ReorderBufferChange ReorderBufferChange
int32 subxcnt
Definition: snapshot.h:181

◆ ReorderBufferCheckMemoryLimit()

static void ReorderBufferCheckMemoryLimit ( ReorderBuffer rb)
static

Definition at line 3043 of file reorderbuffer.c.

References Assert, logical_decoding_work_mem, ReorderBufferTXN::nentries_mem, ReorderBufferCanStartStreaming(), ReorderBufferLargestTopTXN(), ReorderBufferLargestTXN(), ReorderBufferSerializeTXN(), ReorderBufferStreamTXN(), ReorderBufferTXN::size, ReorderBuffer::size, ReorderBufferTXN::toptxn, ReorderBufferTXN::total_size, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferQueueChange().

3044 {
3045  ReorderBufferTXN *txn;
3046 
3047  /* bail out if we haven't exceeded the memory limit */
3048  if (rb->size < logical_decoding_work_mem * 1024L)
3049  return;
3050 
3051  /*
3052  * Loop until we reach under the memory limit. One might think that just
3053  * by evicting the largest (sub)transaction we will come under the memory
3054  * limit based on assumption that the selected transaction is at least as
3055  * large as the most recent change (which caused us to go over the memory
3056  * limit). However, that is not true because a user can reduce the
3057  * logical_decoding_work_mem to a smaller value before the most recent
3058  * change.
3059  */
3060  while (rb->size >= logical_decoding_work_mem * 1024L)
3061  {
3062  /*
3063  * Pick the largest transaction (or subtransaction) and evict it from
3064  * memory by streaming, if possible. Otherwise, spill to disk.
3065  */
3067  (txn = ReorderBufferLargestTopTXN(rb)) != NULL)
3068  {
3069  /* we know there has to be one, because the size is not zero */
3070  Assert(txn && !txn->toptxn);
3071  Assert(txn->total_size > 0);
3072  Assert(rb->size >= txn->total_size);
3073 
3074  ReorderBufferStreamTXN(rb, txn);
3075  }
3076  else
3077  {
3078  /*
3079  * Pick the largest transaction (or subtransaction) and evict it
3080  * from memory by serializing it to disk.
3081  */
3082  txn = ReorderBufferLargestTXN(rb);
3083 
3084  /* we know there has to be one, because the size is not zero */
3085  Assert(txn);
3086  Assert(txn->size > 0);
3087  Assert(rb->size >= txn->size);
3088 
3089  ReorderBufferSerializeTXN(rb, txn);
3090  }
3091 
3092  /*
3093  * After eviction, the transaction should have no entries in memory,
3094  * and should use 0 bytes for changes.
3095  */
3096  Assert(txn->size == 0);
3097  Assert(txn->nentries_mem == 0);
3098  }
3099 
3100  /* We must be under the memory limit now. */
3101  Assert(rb->size < logical_decoding_work_mem * 1024L);
3102 }
static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferLargestTopTXN(ReorderBuffer *rb)
struct ReorderBufferTXN * toptxn
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:745
int logical_decoding_work_mem
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)

◆ ReorderBufferCleanupSerializedTXNs()

static void ReorderBufferCleanupSerializedTXNs ( const char *  slotname)
static

Definition at line 3916 of file reorderbuffer.c.

References AllocateDir(), dirent::d_name, ereport, errcode_for_file_access(), errmsg(), ERROR, FreeDir(), INFO, lstat, MAXPGPATH, ReadDirExtended(), S_ISDIR, snprintf, sprintf, and stat.

Referenced by ReorderBufferAllocate(), ReorderBufferFree(), and StartupReorderBuffer().

3917 {
3918  DIR *spill_dir;
3919  struct dirent *spill_de;
3920  struct stat statbuf;
3921  char path[MAXPGPATH * 2 + 12];
3922 
3923  sprintf(path, "pg_replslot/%s", slotname);
3924 
3925  /* we're only handling directories here, skip if it's not ours */
3926  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
3927  return;
3928 
3929  spill_dir = AllocateDir(path);
3930  while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
3931  {
3932  /* only look at names that can be ours */
3933  if (strncmp(spill_de->d_name, "xid", 3) == 0)
3934  {
3935  snprintf(path, sizeof(path),
3936  "pg_replslot/%s/%s", slotname,
3937  spill_de->d_name);
3938 
3939  if (unlink(path) != 0)
3940  ereport(ERROR,
3942  errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
3943  path, slotname)));
3944  }
3945  }
3946  FreeDir(spill_dir);
3947 }
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2664
#define INFO
Definition: elog.h:33
Definition: dirent.h:9
#define sprintf
Definition: port.h:195
Definition: dirent.c:25
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:633
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2583
#define stat(a, b)
Definition: win32_port.h:255
#define ereport(elevel,...)
Definition: elog.h:144
#define S_ISDIR(m)
Definition: win32_port.h:296
#define lstat(path, sb)
Definition: win32_port.h:244
int errmsg(const char *fmt,...)
Definition: elog.c:824
char d_name[MAX_PATH]
Definition: dirent.h:15
#define snprintf
Definition: port.h:193
int FreeDir(DIR *dir)
Definition: fd.c:2701

◆ ReorderBufferCleanupTXN()

static void ReorderBufferCleanupTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1408 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_node, ReorderBuffer::by_txn, ReorderBufferTXN::changes, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, HASH_REMOVE, hash_search(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, rbtxn_is_serialized, rbtxn_is_streamed, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferFreeSnap(), ReorderBufferRestoreCleanup(), ReorderBufferReturnChange(), ReorderBufferReturnTXN(), SnapBuildSnapDecRefcount(), ReorderBufferTXN::snapshot_now, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAbortOld(), ReorderBufferCommit(), ReorderBufferForget(), ReorderBufferProcessTXN(), and ReorderBufferStreamCommit().

1409 {
1410  bool found;
1411  dlist_mutable_iter iter;
1412 
1413  /* cleanup subtransactions & their changes */
1414  dlist_foreach_modify(iter, &txn->subtxns)
1415  {
1416  ReorderBufferTXN *subtxn;
1417 
1418  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1419 
1420  /*
1421  * Subtransactions are always associated to the toplevel TXN, even if
1422  * they originally were happening inside another subtxn, so we won't
1423  * ever recurse more than one level deep here.
1424  */
1425  Assert(rbtxn_is_known_subxact(subtxn));
1426  Assert(subtxn->nsubtxns == 0);
1427 
1428  ReorderBufferCleanupTXN(rb, subtxn);
1429  }
1430 
1431  /* cleanup changes in the toplevel txn */
1432  dlist_foreach_modify(iter, &txn->changes)
1433  {
1434  ReorderBufferChange *change;
1435 
1436  change = dlist_container(ReorderBufferChange, node, iter.cur);
1437 
1438  /* Check we're not mixing changes from different transactions. */
1439  Assert(change->txn == txn);
1440 
1441  ReorderBufferReturnChange(rb, change, true);
1442  }
1443 
1444  /*
1445  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1446  * They are always stored in the toplevel transaction.
1447  */
1448  dlist_foreach_modify(iter, &txn->tuplecids)
1449  {
1450  ReorderBufferChange *change;
1451 
1452  change = dlist_container(ReorderBufferChange, node, iter.cur);
1453 
1454  /* Check we're not mixing changes from different transactions. */
1455  Assert(change->txn == txn);
1457 
1458  ReorderBufferReturnChange(rb, change, true);
1459  }
1460 
1461  /*
1462  * Cleanup the base snapshot, if set.
1463  */
1464  if (txn->base_snapshot != NULL)
1465  {
1468  }
1469 
1470  /*
1471  * Cleanup the snapshot for the last streamed run.
1472  */
1473  if (txn->snapshot_now != NULL)
1474  {
1475  Assert(rbtxn_is_streamed(txn));
1477  }
1478 
1479  /*
1480  * Remove TXN from its containing list.
1481  *
1482  * Note: if txn is known as subxact, we are deleting the TXN from its
1483  * parent's list of known subxacts; this leaves the parent's nsubxacts
1484  * count too high, but we don't care. Otherwise, we are deleting the TXN
1485  * from the LSN-ordered list of toplevel TXNs.
1486  */
1487  dlist_delete(&txn->node);
1488 
1489  /* now remove reference from buffer */
1490  hash_search(rb->by_txn,
1491  (void *) &txn->xid,
1492  HASH_REMOVE,
1493  &found);
1494  Assert(found);
1495 
1496  /* remove entries spilled to disk */
1497  if (rbtxn_is_serialized(txn))
1498  ReorderBufferRestoreCleanup(rb, txn);
1499 
1500  /* deallocate */
1501  ReorderBufferReturnTXN(rb, txn);
1502 }
Snapshot base_snapshot
dlist_node base_snapshot_node
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:86
Snapshot snapshot_now
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:919
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
#define rbtxn_is_streamed(txn)
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
#define Assert(condition)
Definition: c.h:745
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:428
dlist_head subtxns
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
#define rbtxn_is_serialized(txn)
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
#define rbtxn_is_known_subxact(txn)
dlist_head tuplecids

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2399 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, FirstCommandId, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), ReorderBufferStreamCommit(), ReorderBufferTXNByXid(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

2403 {
2404  ReorderBufferTXN *txn;
2405  Snapshot snapshot_now;
2406  CommandId command_id = FirstCommandId;
2407 
2408  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2409  false);
2410 
2411  /* unknown transaction, nothing to replay */
2412  if (txn == NULL)
2413  return;
2414 
2415  txn->final_lsn = commit_lsn;
2416  txn->end_lsn = end_lsn;
2417  txn->commit_time = commit_time;
2418  txn->origin_id = origin_id;
2419  txn->origin_lsn = origin_lsn;
2420 
2421  /*
2422  * If the transaction was (partially) streamed, we need to commit it in a
2423  * 'streamed' way. That is, we first stream the remaining part of the
2424  * transaction, and then invoke stream_commit message.
2425  *
2426  * Called after everything (origin ID, LSN, ...) is stored in the
2427  * transaction to avoid passing that information directly.
2428  */
2429  if (rbtxn_is_streamed(txn))
2430  {
2431  ReorderBufferStreamCommit(rb, txn);
2432  return;
2433  }
2434 
2435  /*
2436  * If this transaction has no snapshot, it didn't make any changes to the
2437  * database, so there's nothing to decode. Note that
2438  * ReorderBufferCommitChild will have transferred any snapshots from
2439  * subtransactions if there were any.
2440  */
2441  if (txn->base_snapshot == NULL)
2442  {
2443  Assert(txn->ninvalidations == 0);
2444  ReorderBufferCleanupTXN(rb, txn);
2445  return;
2446  }
2447 
2448  snapshot_now = txn->base_snapshot;
2449 
2450  /* Process and send the changes to output plugin. */
2451  ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2452  command_id, false);
2453 }
uint32 CommandId
Definition: c.h:534
TimestampTz commit_time
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
RepOriginId origin_id
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
XLogRecPtr origin_lsn
#define FirstCommandId
Definition: c.h:536
#define rbtxn_is_streamed(txn)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:745
XLogRecPtr end_lsn
static void ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 1098 of file reorderbuffer.c.

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

1101 {
1102  ReorderBufferTXN *subtxn;
1103 
1104  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1105  InvalidXLogRecPtr, false);
1106 
1107  /*
1108  * No need to do anything if that subtxn didn't contain any changes
1109  */
1110  if (!subtxn)
1111  return;
1112 
1113  subtxn->final_lsn = commit_lsn;
1114  subtxn->end_lsn = end_lsn;
1115 
1116  /*
1117  * Assign this subxact as a child of the toplevel xact (no-op if already
1118  * done.)
1119  */
1120  ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1121 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr final_lsn
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
XLogRecPtr end_lsn
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferCopySnap()

static Snapshot ReorderBufferCopySnap ( ReorderBuffer rb,
Snapshot  orig_snap,
ReorderBufferTXN txn,
CommandId  cid 
)
static

Definition at line 1672 of file reorderbuffer.c.

References SnapshotData::active_count, ReorderBuffer::context, SnapshotData::copied, dlist_iter::cur, SnapshotData::curcid, dlist_container, dlist_foreach, i, MemoryContextAllocZero(), ReorderBufferTXN::nsubtxns, qsort, SnapshotData::regd_count, ReorderBufferTXN::subtxns, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::xcnt, ReorderBufferTXN::xid, xidComparator(), and SnapshotData::xip.

Referenced by ReorderBufferProcessTXN(), ReorderBufferSaveTXNSnapshot(), and ReorderBufferStreamTXN().

1674 {
1675  Snapshot snap;
1676  dlist_iter iter;
1677  int i = 0;
1678  Size size;
1679 
1680  size = sizeof(SnapshotData) +
1681  sizeof(TransactionId) * orig_snap->xcnt +
1682  sizeof(TransactionId) * (txn->nsubtxns + 1);
1683 
1684  snap = MemoryContextAllocZero(rb->context, size);
1685  memcpy(snap, orig_snap, sizeof(SnapshotData));
1686 
1687  snap->copied = true;
1688  snap->active_count = 1; /* mark as active so nobody frees it */
1689  snap->regd_count = 0;
1690  snap->xip = (TransactionId *) (snap + 1);
1691 
1692  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1693 
1694  /*
1695  * snap->subxip contains all txids that belong to our transaction which we
1696  * need to check via cmin/cmax. That's why we store the toplevel
1697  * transaction in there as well.
1698  */
1699  snap->subxip = snap->xip + snap->xcnt;
1700  snap->subxip[i++] = txn->xid;
1701 
1702  /*
1703  * subxcnt isn't decreased when subtransactions abort, so count manually.
1704  * Since it's an upper boundary it is safe to use it for the allocation
1705  * above.
1706  */
1707  snap->subxcnt = 1;
1708 
1709  dlist_foreach(iter, &txn->subtxns)
1710  {
1711  ReorderBufferTXN *sub_txn;
1712 
1713  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1714  snap->subxip[i++] = sub_txn->xid;
1715  snap->subxcnt++;
1716  }
1717 
1718  /* sort so we can bsearch() later */
1719  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1720 
1721  /* store the specified current CommandId */
1722  snap->curcid = cid;
1723 
1724  return snap;
1725 }
uint32 TransactionId
Definition: c.h:520
bool copied
Definition: snapshot.h:185
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
uint32 regd_count
Definition: snapshot.h:205
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
struct SnapshotData SnapshotData
TransactionId * xip
Definition: snapshot.h:168
MemoryContext context
TransactionId xid
dlist_node * cur
Definition: ilist.h:161
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:840
CommandId curcid
Definition: snapshot.h:187
size_t Size
Definition: c.h:473
dlist_head subtxns
uint32 xcnt
Definition: snapshot.h:169
int i
#define qsort(a, b, c, d)
Definition: port.h:475
TransactionId * subxip
Definition: snapshot.h:180
uint32 active_count
Definition: snapshot.h:204
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:139
int32 subxcnt
Definition: snapshot.h:181

◆ ReorderBufferExecuteInvalidations()

static void ReorderBufferExecuteInvalidations ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2855 of file reorderbuffer.c.

References i, ReorderBufferTXN::invalidations, LocalExecuteInvalidationMessage(), and ReorderBufferTXN::ninvalidations.

Referenced by ReorderBufferProcessTXN().

2856 {
2857  int i;
2858 
2859  for (i = 0; i < txn->ninvalidations; i++)
2861 }
SharedInvalidationMessage * invalidations
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:559
int i

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2553 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), ReorderBuffer::stream_abort, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeCommit().

2554 {
2555  ReorderBufferTXN *txn;
2556 
2557  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2558  false);
2559 
2560  /* unknown, nothing to forget */
2561  if (txn == NULL)
2562  return;
2563 
2564  /* For streamed transactions notify the remote node about the abort. */
2565  if (rbtxn_is_streamed(txn))
2566  rb->stream_abort(rb, txn, lsn);
2567 
2568  /* cosmetic... */
2569  txn->final_lsn = lsn;
2570 
2571  /*
2572  * Process cache invalidation messages if there are any. Even if we're not
2573  * interested in the transaction's contents, it could have manipulated the
2574  * catalog and we need to update the caches according to that.
2575  */
2576  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2578  txn->invalidations);
2579  else
2580  Assert(txn->ninvalidations == 0);
2581 
2582  /* remove potential on-disk data, and deallocate */
2583  ReorderBufferCleanupTXN(rb, txn);
2584 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
ReorderBufferStreamAbortCB stream_abort
#define rbtxn_is_streamed(txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:745
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer rb)

Definition at line 365 of file reorderbuffer.c.

References ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

366 {
367  MemoryContext context = rb->context;
368 
369  /*
370  * We free separately allocated data by entirely scrapping reorderbuffer's
371  * memory context.
372  */
373  MemoryContextDelete(context);
374 
375  /* Free disk space used by unconsumed reorder buffers */
377 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:212
ReplicationSlotPersistentData data
Definition: slot.h:143
MemoryContext context
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NameStr(name)
Definition: c.h:622
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)

◆ ReorderBufferFreeSnap()

static void ReorderBufferFreeSnap ( ReorderBuffer rb,
Snapshot  snap 
)
static

Definition at line 1731 of file reorderbuffer.c.

References SnapshotData::copied, pfree(), and SnapBuildSnapDecRefcount().

Referenced by ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), ReorderBufferReturnChange(), and ReorderBufferStreamTXN().

1732 {
1733  if (snap->copied)
1734  pfree(snap);
1735  else
1737 }
bool copied
Definition: snapshot.h:185
void pfree(void *pointer)
Definition: mcxt.c:1057
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:428

◆ ReorderBufferGetChange()

ReorderBufferChange* ReorderBufferGetChange ( ReorderBuffer rb)

Definition at line 436 of file reorderbuffer.c.

References ReorderBuffer::change_context, and MemoryContextAlloc().

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddNewTupleCids(), ReorderBufferAddSnapshot(), ReorderBufferQueueMessage(), and ReorderBufferRestoreChange().

437 {
438  ReorderBufferChange *change;
439 
440  change = (ReorderBufferChange *)
442 
443  memset(change, 0, sizeof(ReorderBufferChange));
444  return change;
445 }
MemoryContext change_context
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer rb)

Definition at line 923 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBufferTXNByIdEnt::txn.

Referenced by SnapBuildProcessRunningXacts().

924 {
925  ReorderBufferTXN *txn;
926 
927  AssertTXNLsnOrder(rb);
928 
930  return NULL;
931 
933 
936  return txn;
937 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
dlist_head toplevel_by_lsn
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define Assert(condition)
Definition: c.h:745
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define rbtxn_is_known_subxact(txn)

◆ ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBuffer rb)

Definition at line 951 of file reorderbuffer.c.

References AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, dlist_head_element, dlist_is_empty(), InvalidTransactionId, ReorderBufferTXNByIdEnt::txn, ReorderBuffer::txns_by_base_snapshot_lsn, and SnapshotData::xmin.

Referenced by SnapBuildProcessRunningXacts().

952 {
953  ReorderBufferTXN *txn;
954 
955  AssertTXNLsnOrder(rb);
956 
958  return InvalidTransactionId;
959 
960  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
962  return txn->base_snapshot->xmin;
963 }
Snapshot base_snapshot
dlist_head txns_by_base_snapshot_lsn
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: snapshot.h:157
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289

◆ ReorderBufferGetRelids()

Oid* ReorderBufferGetRelids ( ReorderBuffer rb,
int  nrelids 
)

Definition at line 550 of file reorderbuffer.c.

References ReorderBuffer::context, and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

551 {
552  Oid *relids;
553  Size alloc_len;
554 
555  alloc_len = sizeof(Oid) * nrelids;
556 
557  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
558 
559  return relids;
560 }
unsigned int Oid
Definition: postgres_ext.h:31
MemoryContext context
size_t Size
Definition: c.h:473
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797

◆ ReorderBufferGetTupleBuf()

ReorderBufferTupleBuf* ReorderBufferGetTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)

Definition at line 514 of file reorderbuffer.c.

References ReorderBufferTupleBuf::alloc_tuple_size, MemoryContextAlloc(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, HeapTupleData::t_data, ReorderBuffer::tup_context, and ReorderBufferTupleBuf::tuple.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

515 {
516  ReorderBufferTupleBuf *tuple;
517  Size alloc_len;
518 
519  alloc_len = tuple_len + SizeofHeapTupleHeader;
520 
521  tuple = (ReorderBufferTupleBuf *)
523  sizeof(ReorderBufferTupleBuf) +
524  MAXIMUM_ALIGNOF + alloc_len);
525  tuple->alloc_tuple_size = alloc_len;
526  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
527 
528  return tuple;
529 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
HeapTupleHeader t_data
Definition: htup.h:68
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:38
HeapTupleData tuple
Definition: reorderbuffer.h:29
size_t Size
Definition: c.h:473
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797
MemoryContext tup_context

◆ ReorderBufferGetTXN()

static ReorderBufferTXN * ReorderBufferGetTXN ( ReorderBuffer rb)
static

Definition at line 383 of file reorderbuffer.c.

References ReorderBufferTXN::changes, ReorderBufferTXN::command_id, dlist_init(), InvalidCommandId, MemoryContextAlloc(), ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferTXNByIdEnt::txn, and ReorderBuffer::txn_context.

Referenced by ReorderBufferTXNByXid().

384 {
385  ReorderBufferTXN *txn;
386 
387  txn = (ReorderBufferTXN *)
389 
390  memset(txn, 0, sizeof(ReorderBufferTXN));
391 
392  dlist_init(&txn->changes);
393  dlist_init(&txn->tuplecids);
394  dlist_init(&txn->subtxns);
395 
396  /* InvalidCommandId is not zero, so set it explicitly */
398 
399  return txn;
400 }
CommandId command_id
dlist_head changes
#define InvalidCommandId
Definition: c.h:537
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
dlist_head subtxns
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797
MemoryContext txn_context
dlist_head tuplecids

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 2593 of file reorderbuffer.c.

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by DecodeXactOp(), ReorderBufferAbort(), and ReorderBufferForget().

2595 {
2596  bool use_subtxn = IsTransactionOrTransactionBlock();
2597  int i;
2598 
2599  if (use_subtxn)
2600  BeginInternalSubTransaction("replay");
2601 
2602  /*
2603  * Force invalidations to happen outside of a valid transaction - that way
2604  * entries will just be marked as invalid without accessing the catalog.
2605  * That's advantageous because we don't need to setup the full state
2606  * necessary for catalog access.
2607  */
2608  if (use_subtxn)
2610 
2611  for (i = 0; i < ninvalidations; i++)
2612  LocalExecuteInvalidationMessage(&invalidations[i]);
2613 
2614  if (use_subtxn)
2616 }
void AbortCurrentTransaction(void)
Definition: xact.c:3211
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4702
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4513
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4408
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:559
int i

◆ ReorderBufferIterCompare()

static int ReorderBufferIterCompare ( Datum  a,
Datum  b,
void *  arg 
)
static

Definition at line 1140 of file reorderbuffer.c.

References DatumGetInt32, ReorderBufferIterTXNState::entries, and ReorderBufferIterTXNEntry::lsn.

Referenced by ReorderBufferIterTXNInit().

1141 {
1143  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
1144  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
1145 
1146  if (pos_a < pos_b)
1147  return 1;
1148  else if (pos_a == pos_b)
1149  return 0;
1150  return -1;
1151 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
#define DatumGetInt32(X)
Definition: postgres.h:472
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:298
void * arg

◆ ReorderBufferIterTXNFinish()

static void ReorderBufferIterTXNFinish ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1377 of file reorderbuffer.c.

References Assert, binaryheap_free(), dlist_container, dlist_is_empty(), dlist_pop_head_node(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::file, FileClose(), ReorderBufferIterTXNState::heap, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, pfree(), ReorderBufferReturnChange(), and TXNEntryFile::vfd.

Referenced by ReorderBufferProcessTXN().

1379 {
1380  int32 off;
1381 
1382  for (off = 0; off < state->nr_txns; off++)
1383  {
1384  if (state->entries[off].file.vfd != -1)
1385  FileClose(state->entries[off].file.vfd);
1386  }
1387 
1388  /* free memory we might have "leaked" in the last *Next call */
1389  if (!dlist_is_empty(&state->old_change))
1390  {
1391  ReorderBufferChange *change;
1392 
1393  change = dlist_container(ReorderBufferChange, node,
1394  dlist_pop_head_node(&state->old_change));
1395  ReorderBufferReturnChange(rb, change, true);
1396  Assert(dlist_is_empty(&state->old_change));
1397  }
1398 
1399  binaryheap_free(state->heap);
1400  pfree(state);
1401 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
signed int int32
Definition: c.h:362
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:1057
void FileClose(File file)
Definition: fd.c:1826
#define Assert(condition)
Definition: c.h:745
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:69
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)

◆ ReorderBufferIterTXNInit()

static void ReorderBufferIterTXNInit ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferIterTXNState *volatile *  iter_state 
)
static

Definition at line 1163 of file reorderbuffer.c.

References AssertChangeLsnOrder(), binaryheap_add_unordered(), binaryheap_allocate(), binaryheap_build(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, ReorderBuffer::context, dlist_iter::cur, dlist_container, dlist_foreach, dlist_head_element, dlist_init(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::file, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, MemoryContextAllocZero(), ReorderBufferTXN::nentries, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, rbtxn_is_serialized, ReorderBufferIterCompare(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferIterTXNEntry::segno, ReorderBufferTXN::subtxns, ReorderBufferTXNByIdEnt::txn, ReorderBufferIterTXNEntry::txn, and TXNEntryFile::vfd.

Referenced by ReorderBufferProcessTXN().

1165 {
1166  Size nr_txns = 0;
1168  dlist_iter cur_txn_i;
1169  int32 off;
1170 
1171  *iter_state = NULL;
1172 
1173  /* Check ordering of changes in the toplevel transaction. */
1174  AssertChangeLsnOrder(txn);
1175 
1176  /*
1177  * Calculate the size of our heap: one element for every transaction that
1178  * contains changes. (Besides the transactions already in the reorder
1179  * buffer, we count the one we were directly passed.)
1180  */
1181  if (txn->nentries > 0)
1182  nr_txns++;
1183 
1184  dlist_foreach(cur_txn_i, &txn->subtxns)
1185  {
1186  ReorderBufferTXN *cur_txn;
1187 
1188  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1189 
1190  /* Check ordering of changes in this subtransaction. */
1191  AssertChangeLsnOrder(cur_txn);
1192 
1193  if (cur_txn->nentries > 0)
1194  nr_txns++;
1195  }
1196 
1197  /* allocate iteration state */
1198  state = (ReorderBufferIterTXNState *)
1200  sizeof(ReorderBufferIterTXNState) +
1201  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1202 
1203  state->nr_txns = nr_txns;
1204  dlist_init(&state->old_change);
1205 
1206  for (off = 0; off < state->nr_txns; off++)
1207  {
1208  state->entries[off].file.vfd = -1;
1209  state->entries[off].segno = 0;
1210  }
1211 
1212  /* allocate heap */
1213  state->heap = binaryheap_allocate(state->nr_txns,
1215  state);
1216 
1217  /* Now that the state fields are initialized, it is safe to return it. */
1218  *iter_state = state;
1219 
1220  /*
1221  * Now insert items into the binary heap, in an unordered fashion. (We
1222  * will run a heap assembly step at the end; this is more efficient.)
1223  */
1224 
1225  off = 0;
1226 
1227  /* add toplevel transaction if it contains changes */
1228  if (txn->nentries > 0)
1229  {
1230  ReorderBufferChange *cur_change;
1231 
1232  if (rbtxn_is_serialized(txn))
1233  {
1234  /* serialize remaining changes */
1235  ReorderBufferSerializeTXN(rb, txn);
1236  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1237  &state->entries[off].segno);
1238  }
1239 
1240  cur_change = dlist_head_element(ReorderBufferChange, node,
1241  &txn->changes);
1242 
1243  state->entries[off].lsn = cur_change->lsn;
1244  state->entries[off].change = cur_change;
1245  state->entries[off].txn = txn;
1246 
1248  }
1249 
1250  /* add subtransactions if they contain changes */
1251  dlist_foreach(cur_txn_i, &txn->subtxns)
1252  {
1253  ReorderBufferTXN *cur_txn;
1254 
1255  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1256 
1257  if (cur_txn->nentries > 0)
1258  {
1259  ReorderBufferChange *cur_change;
1260 
1261  if (rbtxn_is_serialized(cur_txn))
1262  {
1263  /* serialize remaining changes */
1264  ReorderBufferSerializeTXN(rb, cur_txn);
1265  ReorderBufferRestoreChanges(rb, cur_txn,
1266  &state->entries[off].file,
1267  &state->entries[off].segno);
1268  }
1269  cur_change = dlist_head_element(ReorderBufferChange, node,
1270  &cur_txn->changes);
1271 
1272  state->entries[off].lsn = cur_change->lsn;
1273  state->entries[off].change = cur_change;
1274  state->entries[off].txn = cur_txn;
1275 
1277  }
1278  }
1279 
1280  /* assemble a valid binary heap */
1281  binaryheap_build(state->heap);
1282 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
ReorderBufferTXN * txn
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
void binaryheap_add_unordered(binaryheap *heap, Datum d)
Definition: binaryheap.c:110
signed int int32
Definition: c.h:362
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
ReorderBufferChange * change
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
MemoryContext context
dlist_node * cur
Definition: ilist.h:161
static void AssertChangeLsnOrder(ReorderBufferTXN *txn)
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:840
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:126
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
Definition: regguts.h:298
size_t Size
Definition: c.h:473
dlist_head subtxns
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:33
#define Int32GetDatum(X)
Definition: postgres.h:479
#define rbtxn_is_serialized(txn)

◆ ReorderBufferIterTXNNext()

static ReorderBufferChange * ReorderBufferIterTXNNext ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1291 of file reorderbuffer.c.

References Assert, binaryheap::bh_size, binaryheap_first(), binaryheap_remove_first(), binaryheap_replace_first(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, DatumGetInt32, DEBUG2, dlist_container, dlist_delete(), dlist_has_next(), dlist_head_element, dlist_is_empty(), dlist_next_node(), dlist_pop_head_node(), dlist_push_tail(), elog, ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::file, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, next, ReorderBufferChange::node, ReorderBufferIterTXNState::old_change, ReorderBufferRestoreChanges(), ReorderBufferReturnChange(), ReorderBufferIterTXNEntry::segno, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferProcessTXN().

1292 {
1293  ReorderBufferChange *change;
1295  int32 off;
1296 
1297  /* nothing there anymore */
1298  if (state->heap->bh_size == 0)
1299  return NULL;
1300 
1301  off = DatumGetInt32(binaryheap_first(state->heap));
1302  entry = &state->entries[off];
1303 
1304  /* free memory we might have "leaked" in the previous *Next call */
1305  if (!dlist_is_empty(&state->old_change))
1306  {
1307  change = dlist_container(ReorderBufferChange, node,
1308  dlist_pop_head_node(&state->old_change));
1309  ReorderBufferReturnChange(rb, change, true);
1310  Assert(dlist_is_empty(&state->old_change));
1311  }
1312 
1313  change = entry->change;
1314 
1315  /*
1316  * update heap with information about which transaction has the next
1317  * relevant change in LSN order
1318  */
1319 
1320  /* there are in-memory changes */
1321  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1322  {
1323  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1324  ReorderBufferChange *next_change =
1325  dlist_container(ReorderBufferChange, node, next);
1326 
1327  /* txn stays the same */
1328  state->entries[off].lsn = next_change->lsn;
1329  state->entries[off].change = next_change;
1330 
1332  return change;
1333  }
1334 
1335  /* try to load changes from disk */
1336  if (entry->txn->nentries != entry->txn->nentries_mem)
1337  {
1338  /*
1339  * Ugly: restoring changes will reuse *Change records, thus delete the
1340  * current one from the per-tx list and only free in the next call.
1341  */
1342  dlist_delete(&change->node);
1343  dlist_push_tail(&state->old_change, &change->node);
1344 
1345  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1346  &state->entries[off].segno))
1347  {
1348  /* successfully restored changes from disk */
1349  ReorderBufferChange *next_change =
1351  &entry->txn->changes);
1352 
1353  elog(DEBUG2, "restored %u/%u changes from disk",
1354  (uint32) entry->txn->nentries_mem,
1355  (uint32) entry->txn->nentries);
1356 
1357  Assert(entry->txn->nentries_mem);
1358  /* txn stays the same */
1359  state->entries[off].lsn = next_change->lsn;
1360  state->entries[off].change = next_change;
1362 
1363  return change;
1364  }
1365  }
1366 
1367  /* ok, no changes there anymore, remove */
1368  binaryheap_remove_first(state->heap);
1369 
1370  return change;
1371 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
static int32 next
Definition: blutils.c:219
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
#define DatumGetInt32(X)
Definition: postgres.h:472
ReorderBufferTXN * txn
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void binaryheap_replace_first(binaryheap *heap, Datum d)
Definition: binaryheap.c:204
signed int int32
Definition: c.h:362
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:421
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
Datum binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:159
#define DEBUG2
Definition: elog.h:24
unsigned int uint32
Definition: c.h:374
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
int bh_size
Definition: binaryheap.h:32
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
ReorderBufferChange * change
static bool dlist_has_next(dlist_head *head, dlist_node *node)
Definition: ilist.h:402
#define Assert(condition)
Definition: c.h:745
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define Int32GetDatum(X)
Definition: postgres.h:479
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
#define elog(elevel,...)
Definition: elog.h:214
Datum binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:174
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)

◆ ReorderBufferLargestTopTXN()

static ReorderBufferTXN* ReorderBufferLargestTopTXN ( ReorderBuffer rb)
static

Definition at line 3008 of file reorderbuffer.c.

References dlist_iter::cur, dlist_container, dlist_foreach, rbtxn_has_incomplete_tuple, ReorderBuffer::toplevel_by_lsn, ReorderBufferTXN::total_size, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferCheckMemoryLimit().

3009 {
3010  dlist_iter iter;
3011  Size largest_size = 0;
3012  ReorderBufferTXN *largest = NULL;
3013 
3014  /* Find the largest top-level transaction. */
3015  dlist_foreach(iter, &rb->toplevel_by_lsn)
3016  {
3017  ReorderBufferTXN *txn;
3018 
3019  txn = dlist_container(ReorderBufferTXN, node, iter.cur);
3020 
3021  if ((largest != NULL || txn->total_size > largest_size) &&
3022  (txn->total_size > 0) && !(rbtxn_has_incomplete_tuple(txn)))
3023  {
3024  largest = txn;
3025  largest_size = txn->total_size;
3026  }
3027  }
3028 
3029  return largest;
3030 }
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
#define rbtxn_has_incomplete_tuple(txn)
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head toplevel_by_lsn
dlist_node * cur
Definition: ilist.h:161
size_t Size
Definition: c.h:473

◆ ReorderBufferLargestTXN()

static ReorderBufferTXN* ReorderBufferLargestTXN ( ReorderBuffer rb)
static

Definition at line 2964 of file reorderbuffer.c.

References Assert, ReorderBuffer::by_txn, hash_seq_init(), hash_seq_search(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferCheckMemoryLimit().

2965 {
2966  HASH_SEQ_STATUS hash_seq;
2968  ReorderBufferTXN *largest = NULL;
2969 
2970  hash_seq_init(&hash_seq, rb->by_txn);
2971  while ((ent = hash_seq_search(&hash_seq)) != NULL)
2972  {
2973  ReorderBufferTXN *txn = ent->txn;
2974 
2975  /* if the current transaction is larger, remember it */
2976  if ((!largest) || (txn->size > largest->size))
2977  largest = txn;
2978  }
2979 
2980  Assert(largest);
2981  Assert(largest->size > 0);
2982  Assert(largest->size <= rb->size);
2983 
2984  return largest;
2985 }
#define Assert(condition)
Definition: c.h:745
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1401
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1391
ReorderBufferTXN * txn

◆ ReorderBufferProcessPartialChange()

static void ReorderBufferProcessPartialChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  toast_insert 
)
static

Definition at line 666 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, IsInsertOrUpdate, IsSpecConfirm, IsSpecInsert, rbtxn_has_incomplete_tuple, RBTXN_HAS_SPEC_INSERT, rbtxn_has_spec_insert, RBTXN_HAS_TOAST_INSERT, rbtxn_has_toast_insert, rbtxn_is_serialized, ReorderBufferCanStartStreaming(), ReorderBufferCanStream(), ReorderBufferStreamTXN(), ReorderBufferTXN::toptxn, ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::txn_flags.

Referenced by ReorderBufferQueueChange().

669 {
670  ReorderBufferTXN *toptxn;
671 
672  /*
673  * The partial changes need to be processed only while streaming
674  * in-progress transactions.
675  */
676  if (!ReorderBufferCanStream(rb))
677  return;
678 
679  /* Get the top transaction. */
680  if (txn->toptxn != NULL)
681  toptxn = txn->toptxn;
682  else
683  toptxn = txn;
684 
685  /*
686  * Set the toast insert bit whenever we get toast insert to indicate a
687  * partial change and clear it when we get the insert or update on main
688  * table (Both update and insert will do the insert in the toast table).
689  */
690  if (toast_insert)
692  else if (rbtxn_has_toast_insert(toptxn) &&
693  IsInsertOrUpdate(change->action))
694  toptxn->txn_flags &= ~RBTXN_HAS_TOAST_INSERT;
695 
696  /*
697  * Set the spec insert bit whenever we get the speculative insert to
698  * indicate the partial change and clear the same on speculative confirm.
699  */
700  if (IsSpecInsert(change->action))
701  toptxn->txn_flags |= RBTXN_HAS_SPEC_INSERT;
702  else if (IsSpecConfirm(change->action))
703  {
704  /*
705  * Speculative confirm change must be preceded by speculative
706  * insertion.
707  */
708  Assert(rbtxn_has_spec_insert(toptxn));
709  toptxn->txn_flags &= ~RBTXN_HAS_SPEC_INSERT;
710  }
711 
712  /*
713  * Stream the transaction if it is serialized before and the changes are
714  * now complete in the top-level transaction.
715  *
716  * The reason for doing the streaming of such a transaction as soon as we
717  * get the complete change for it is that previously it would have reached
718  * the memory threshold and wouldn't get streamed because of incomplete
719  * changes. Delaying such transactions would increase apply lag for them.
720  */
722  !(rbtxn_has_incomplete_tuple(toptxn)) &&
723  rbtxn_is_serialized(txn))
724  ReorderBufferStreamTXN(rb, toptxn);
725 }
#define RBTXN_HAS_TOAST_INSERT
#define rbtxn_has_incomplete_tuple(txn)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define rbtxn_has_toast_insert(txn)
#define IsSpecInsert(action)
static bool ReorderBufferCanStream(ReorderBuffer *rb)
struct ReorderBufferTXN * toptxn
#define rbtxn_has_spec_insert(txn)
#define Assert(condition)
Definition: c.h:745
#define IsSpecConfirm(action)
#define IsInsertOrUpdate(action)
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)
#define RBTXN_HAS_SPEC_INSERT
#define rbtxn_is_serialized(txn)

◆ ReorderBufferProcessTXN()

static void ReorderBufferProcessTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn,
volatile Snapshot  snapshot_now,
volatile CommandId  command_id,
bool  streaming 
)
static

Definition at line 1915 of file reorderbuffer.c.

References AbortCurrentTransaction(), ReorderBufferChange::action, Assert, ReorderBuffer::begin, BeginInternalSubTransaction(), CheckXidAlive, ReorderBufferChange::command_id, ReorderBuffer::commit, ReorderBufferTXN::concurrent_abort, SnapshotData::copied, CopyErrorData(), SnapshotData::curcid, CurrentMemoryContext, ReorderBufferChange::data, dlist_delete(), elog, ERROR, FlushErrorState(), FreeErrorData(), GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), i, InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, MemoryContextSwitchTo(), ReorderBufferChange::node, ReorderBufferChange::origin_id, ReorderBufferTXN::origin_id, ReorderBuffer::output_rewrites, palloc0(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelationIsValid, RelidByRelfilenode(), relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferApplyChange(), ReorderBufferApplyMessage(), ReorderBufferApplyTruncate(), ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferResetTXN(), ReorderBufferReturnChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), RollbackAndReleaseCurrentSubTransaction(), SetupCheckXidLive(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, ErrorData::sqlerrcode, StartTransactionCommand(), ReorderBuffer::stream_start, ReorderBuffer::stream_stop, TeardownHistoricSnapshot(), ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTXN::tuplecid_hash, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferCommit(), and ReorderBufferStreamTXN().

1920 {
1921  bool using_subtxn;
1923  ReorderBufferIterTXNState *volatile iterstate = NULL;
1924  volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
1925  ReorderBufferChange *volatile specinsert = NULL;
1926  volatile bool stream_started = false;
1927  ReorderBufferTXN *volatile curtxn = NULL;
1928 
1929  /* build data to be able to lookup the CommandIds of catalog tuples */
1931 
1932  /* setup the initial snapshot */
1933  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1934 
1935  /*
1936  * Decoding needs access to syscaches et al., which in turn use
1937  * heavyweight locks and such. Thus we need to have enough state around to
1938  * keep track of those. The easiest way is to simply use a transaction
1939  * internally. That also allows us to easily enforce that nothing writes
1940  * to the database by checking for xid assignments.
1941  *
1942  * When we're called via the SQL SRF there's already a transaction
1943  * started, so start an explicit subtransaction there.
1944  */
1945  using_subtxn = IsTransactionOrTransactionBlock();
1946 
1947  PG_TRY();
1948  {
1949  ReorderBufferChange *change;
1950 
1951  if (using_subtxn)
1952  BeginInternalSubTransaction(streaming ? "stream" : "replay");
1953  else
1955 
1956  /* We only need to send begin/commit for non-streamed transactions. */
1957  if (!streaming)
1958  rb->begin(rb, txn);
1959 
1960  ReorderBufferIterTXNInit(rb, txn, &iterstate);
1961  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1962  {
1963  Relation relation = NULL;
1964  Oid reloid;
1965 
1966  /*
1967  * We can't call start stream callback before processing first
1968  * change.
1969  */
1970  if (prev_lsn == InvalidXLogRecPtr)
1971  {
1972  if (streaming)
1973  {
1974  txn->origin_id = change->origin_id;
1975  rb->stream_start(rb, txn, change->lsn);
1976  stream_started = true;
1977  }
1978  }
1979 
1980  /*
1981  * Enforce correct ordering of changes, merged from multiple
1982  * subtransactions. The changes may have the same LSN due to
1983  * MULTI_INSERT xlog records.
1984  */
1985  Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
1986 
1987  prev_lsn = change->lsn;
1988 
1989  /* Set the current xid to detect concurrent aborts. */
1990  if (streaming)
1991  {
1992  curtxn = change->txn;
1993  SetupCheckXidLive(curtxn->xid);
1994  }
1995 
1996  switch (change->action)
1997  {
1999 
2000  /*
2001  * Confirmation for speculative insertion arrived. Simply
2002  * use as a normal record. It'll be cleaned up at the end
2003  * of INSERT processing.
2004  */
2005  if (specinsert == NULL)
2006  elog(ERROR, "invalid ordering of speculative insertion changes");
2007  Assert(specinsert->data.tp.oldtuple == NULL);
2008  change = specinsert;
2010 
2011  /* intentionally fall through */
2015  Assert(snapshot_now);
2016 
2017  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
2018  change->data.tp.relnode.relNode);
2019 
2020  /*
2021  * Mapped catalog tuple without data, emitted while
2022  * catalog table was in the process of being rewritten. We
2023  * can fail to look up the relfilenode, because the
2024  * relmapper has no "historic" view, in contrast to normal
2025  * the normal catalog during decoding. Thus repeated
2026  * rewrites can cause a lookup failure. That's OK because
2027  * we do not decode catalog changes anyway. Normally such
2028  * tuples would be skipped over below, but we can't
2029  * identify whether the table should be logically logged
2030  * without mapping the relfilenode to the oid.
2031  */
2032  if (reloid == InvalidOid &&
2033  change->data.tp.newtuple == NULL &&
2034  change->data.tp.oldtuple == NULL)
2035  goto change_done;
2036  else if (reloid == InvalidOid)
2037  elog(ERROR, "could not map filenode \"%s\" to relation OID",
2038  relpathperm(change->data.tp.relnode,
2039  MAIN_FORKNUM));
2040 
2041  relation = RelationIdGetRelation(reloid);
2042 
2043  if (!RelationIsValid(relation))
2044  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
2045  reloid,
2046  relpathperm(change->data.tp.relnode,
2047  MAIN_FORKNUM));
2048 
2049  if (!RelationIsLogicallyLogged(relation))
2050  goto change_done;
2051 
2052  /*
2053  * Ignore temporary heaps created during DDL unless the
2054  * plugin has asked for them.
2055  */
2056  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2057  goto change_done;
2058 
2059  /*
2060  * For now ignore sequence changes entirely. Most of the
2061  * time they don't log changes using records we
2062  * understand, so it doesn't make sense to handle the few
2063  * cases we do.
2064  */
2065  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2066  goto change_done;
2067 
2068  /* user-triggered change */
2069  if (!IsToastRelation(relation))
2070  {
2071  ReorderBufferToastReplace(rb, txn, relation, change);
2072  ReorderBufferApplyChange(rb, txn, relation, change,
2073  streaming);
2074 
2075  /*
2076  * Only clear reassembled toast chunks if we're sure
2077  * they're not required anymore. The creator of the
2078  * tuple tells us.
2079  */
2080  if (change->data.tp.clear_toast_afterwards)
2081  ReorderBufferToastReset(rb, txn);
2082  }
2083  /* we're not interested in toast deletions */
2084  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2085  {
2086  /*
2087  * Need to reassemble the full toasted Datum in
2088  * memory, to ensure the chunks don't get reused till
2089  * we're done remove it from the list of this
2090  * transaction's changes. Otherwise it will get
2091  * freed/reused while restoring spooled data from
2092  * disk.
2093  */
2094  Assert(change->data.tp.newtuple != NULL);
2095 
2096  dlist_delete(&change->node);
2097  ReorderBufferToastAppendChunk(rb, txn, relation,
2098  change);
2099  }
2100 
2101  change_done:
2102 
2103  /*
2104  * Either speculative insertion was confirmed, or it was
2105  * unsuccessful and the record isn't needed anymore.
2106  */
2107  if (specinsert != NULL)
2108  {
2109  ReorderBufferReturnChange(rb, specinsert, true);
2110  specinsert = NULL;
2111  }
2112 
2113  if (RelationIsValid(relation))
2114  {
2115  RelationClose(relation);
2116  relation = NULL;
2117  }
2118  break;
2119 
2121 
2122  /*
2123  * Speculative insertions are dealt with by delaying the
2124  * processing of the insert until the confirmation record
2125  * arrives. For that we simply unlink the record from the
2126  * chain, so it does not get freed/reused while restoring
2127  * spooled data from disk.
2128  *
2129  * This is safe in the face of concurrent catalog changes
2130  * because the relevant relation can't be changed between
2131  * speculative insertion and confirmation due to
2132  * CheckTableNotInUse() and locking.
2133  */
2134 
2135  /* clear out a pending (and thus failed) speculation */
2136  if (specinsert != NULL)
2137  {
2138  ReorderBufferReturnChange(rb, specinsert, true);
2139  specinsert = NULL;
2140  }
2141 
2142  /* and memorize the pending insertion */
2143  dlist_delete(&change->node);
2144  specinsert = change;
2145  break;
2146 
2148  {
2149  int i;
2150  int nrelids = change->data.truncate.nrelids;
2151  int nrelations = 0;
2152  Relation *relations;
2153 
2154  relations = palloc0(nrelids * sizeof(Relation));
2155  for (i = 0; i < nrelids; i++)
2156  {
2157  Oid relid = change->data.truncate.relids[i];
2158  Relation relation;
2159 
2160  relation = RelationIdGetRelation(relid);
2161 
2162  if (!RelationIsValid(relation))
2163  elog(ERROR, "could not open relation with OID %u", relid);
2164 
2165  if (!RelationIsLogicallyLogged(relation))
2166  continue;
2167 
2168  relations[nrelations++] = relation;
2169  }
2170 
2171  /* Apply the truncate. */
2172  ReorderBufferApplyTruncate(rb, txn, nrelations,
2173  relations, change,
2174  streaming);
2175 
2176  for (i = 0; i < nrelations; i++)
2177  RelationClose(relations[i]);
2178 
2179  break;
2180  }
2181 
2183  ReorderBufferApplyMessage(rb, txn, change, streaming);
2184  break;
2185 
2187  /* get rid of the old */
2188  TeardownHistoricSnapshot(false);
2189 
2190  if (snapshot_now->copied)
2191  {
2192  ReorderBufferFreeSnap(rb, snapshot_now);
2193  snapshot_now =
2194  ReorderBufferCopySnap(rb, change->data.snapshot,
2195  txn, command_id);
2196  }
2197 
2198  /*
2199  * Restored from disk, need to be careful not to double
2200  * free. We could introduce refcounting for that, but for
2201  * now this seems infrequent enough not to care.
2202  */
2203  else if (change->data.snapshot->copied)
2204  {
2205  snapshot_now =
2206  ReorderBufferCopySnap(rb, change->data.snapshot,
2207  txn, command_id);
2208  }
2209  else
2210  {
2211  snapshot_now = change->data.snapshot;
2212  }
2213 
2214  /* and continue with the new one */
2215  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2216  break;
2217 
2219  Assert(change->data.command_id != InvalidCommandId);
2220 
2221  if (command_id < change->data.command_id)
2222  {
2223  command_id = change->data.command_id;
2224 
2225  if (!snapshot_now->copied)
2226  {
2227  /* we don't use the global one anymore */
2228  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2229  txn, command_id);
2230  }
2231 
2232  snapshot_now->curcid = command_id;
2233 
2234  TeardownHistoricSnapshot(false);
2235  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2236 
2237  /*
2238  * Every time the CommandId is incremented, we could
2239  * see new catalog contents, so execute all
2240  * invalidations.
2241  */
2243  }
2244 
2245  break;
2246 
2248  elog(ERROR, "tuplecid value in changequeue");
2249  break;
2250  }
2251  }
2252 
2253  /*
2254  * There's a speculative insertion remaining, just clean in up, it
2255  * can't have been successful, otherwise we'd gotten a confirmation
2256  * record.
2257  */
2258  if (specinsert)
2259  {
2260  ReorderBufferReturnChange(rb, specinsert, true);
2261  specinsert = NULL;
2262  }
2263 
2264  /* clean up the iterator */
2265  ReorderBufferIterTXNFinish(rb, iterstate);
2266  iterstate = NULL;
2267 
2268  /*
2269  * Done with current changes, send the last message for this set of
2270  * changes depending upon streaming mode.
2271  */
2272  if (streaming)
2273  {
2274  if (stream_started)
2275  {
2276  rb->stream_stop(rb, txn, prev_lsn);
2277  stream_started = false;
2278  }
2279  }
2280  else
2281  rb->commit(rb, txn, commit_lsn);
2282 
2283  /* this is just a sanity check against bad output plugin behaviour */
2285  elog(ERROR, "output plugin used XID %u",
2287 
2288  /*
2289  * Remember the command ID and snapshot for the next set of changes in
2290  * streaming mode.
2291  */
2292  if (streaming)
2293  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2294  else if (snapshot_now->copied)
2295  ReorderBufferFreeSnap(rb, snapshot_now);
2296 
2297  /* cleanup */
2298  TeardownHistoricSnapshot(false);
2299 
2300  /*
2301  * Aborting the current (sub-)transaction as a whole has the right
2302  * semantics. We want all locks acquired in here to be released, not
2303  * reassigned to the parent and we do not want any database access
2304  * have persistent effects.
2305  */
2307 
2308  /* make sure there's no cache pollution */
2310 
2311  if (using_subtxn)
2313 
2314  /*
2315  * If we are streaming the in-progress transaction then discard the
2316  * changes that we just streamed, and mark the transactions as
2317  * streamed (if they contained changes). Otherwise, remove all the
2318  * changes and deallocate the ReorderBufferTXN.
2319  */
2320  if (streaming)
2321  {
2322  ReorderBufferTruncateTXN(rb, txn);
2323 
2324  /* Reset the CheckXidAlive */
2326  }
2327  else
2328  ReorderBufferCleanupTXN(rb, txn);
2329  }
2330  PG_CATCH();
2331  {
2332  MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
2333  ErrorData *errdata = CopyErrorData();
2334 
2335  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
2336  if (iterstate)
2337  ReorderBufferIterTXNFinish(rb, iterstate);
2338 
2340 
2341  /*
2342  * Force cache invalidation to happen outside of a valid transaction
2343  * to prevent catalog access as we just caught an error.
2344  */
2346 
2347  /* make sure there's no cache pollution */
2349 
2350  if (using_subtxn)
2352 
2353  /*
2354  * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2355  * abort of the (sub)transaction we are streaming. We need to do the
2356  * cleanup and return gracefully on this error, see SetupCheckXidLive.
2357  */
2358  if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK)
2359  {
2360  /*
2361  * This error can only occur when we are sending the data in
2362  * streaming mode and the streaming is not finished yet.
2363  */
2364  Assert(streaming);
2365  Assert(stream_started);
2366 
2367  /* Cleanup the temporary error state. */
2368  FlushErrorState();
2369  FreeErrorData(errdata);
2370  errdata = NULL;
2371  curtxn->concurrent_abort = true;
2372 
2373  /* Reset the TXN so that it is allowed to stream remaining data. */
2374  ReorderBufferResetTXN(rb, txn, snapshot_now,
2375  command_id, prev_lsn,
2376  specinsert);
2377  }
2378  else
2379  {
2380  ReorderBufferCleanupTXN(rb, txn);
2381  MemoryContextSwitchTo(ecxt);
2382  PG_RE_THROW();
2383  }
2384  }
2385  PG_END_TRY();
2386 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
void AbortCurrentTransaction(void)
Definition: xact.c:3211
bool IsToastRelation(Relation relation)
Definition: catalog.c:140
#define relpathperm(rnode, forknum)
Definition: relpath.h:83
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
struct ReorderBufferChange::@99::@101 truncate
bool copied
Definition: snapshot.h:185
int sqlerrcode
Definition: elog.h:364
ErrorData * CopyErrorData(void)
Definition: elog.c:1474
static void ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:86
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4702
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2074
ReorderBufferCommitCB commit
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:635
Form_pg_class rd_rel
Definition: rel.h:109
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
void FlushErrorState(void)
Definition: elog.c:1568
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1530
#define ERROR
Definition: elog.h:43
#define RelationIsValid(relation)
Definition: rel.h:429
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:438
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4513
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:455
#define InvalidTransactionId
Definition: transam.h:31
void RelationClose(Relation relation)
Definition: relcache.c:2110
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
RepOriginId origin_id
Definition: reorderbuffer.h:88
TransactionId CheckXidAlive
Definition: xact.c:95
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
static void ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
void * palloc0(Size size)
Definition: mcxt.c:981
static void ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
#define InvalidCommandId
Definition: c.h:537
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
union ReorderBufferChange::@99 data
TransactionId xid
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define InvalidOid
Definition: postgres_ext.h:36
ReorderBufferStreamStartCB stream_start
static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define PG_CATCH()
Definition: elog.h:305
struct ReorderBufferChange::@99::@100 tp
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:745
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
void StartTransactionCommand(void)
Definition: xact.c:2846
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4408
#define PG_RE_THROW()
Definition: elog.h:336
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2058
#define elog(elevel,...)
Definition: elog.h:214
int i
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
static void SetupCheckXidLive(TransactionId xid)
ReorderBufferBeginCB begin
#define PG_TRY()
Definition: elog.h:295
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2004
#define PG_END_TRY()
Definition: elog.h:320
static void ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
ReorderBufferStreamStopCB stream_stop

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2629 of file reorderbuffer.c.

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by DecodeHeap2Op(), DecodeHeapOp(), DecodeLogicalMsgOp(), DecodeStandbyOp(), DecodeXactOp(), DecodeXLogOp(), and LogicalDecodingProcessRecord().

2630 {
2631  /* many records won't have an xid assigned, centralize check here */
2632  if (xid != InvalidTransactionId)
2633  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2634 }
#define InvalidTransactionId
Definition: transam.h:31
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change,
bool  toast_insert 
)

Definition at line 732 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, ReorderBufferTXN::concurrent_abort, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBufferChangeMemoryUpdate(), ReorderBufferCheckMemoryLimit(), ReorderBufferProcessPartialChange(), ReorderBufferReturnChange(), ReorderBufferTXNByXid(), ReorderBufferChange::txn, and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

734 {
735  ReorderBufferTXN *txn;
736 
737  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
738 
739  /*
740  * While streaming the previous changes we have detected that the
741  * transaction is aborted. So there is no point in collecting further
742  * changes for it.
743  */
744  if (txn->concurrent_abort)
745  {
746  /*
747  * We don't need to update memory accounting for this change as we
748  * have not added it to the queue yet.
749  */
750  ReorderBufferReturnChange(rb, change, false);
751  return;
752  }
753 
754  change->lsn = lsn;
755  change->txn = txn;
756 
757  Assert(InvalidXLogRecPtr != lsn);
758  dlist_push_tail(&txn->changes, &change->node);
759  txn->nentries++;
760  txn->nentries_mem++;
761 
762  /* update memory accounting information */
763  ReorderBufferChangeMemoryUpdate(rb, change, true);
764 
765  /* process partial change */
766  ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
767 
768  /* check the memory limits and evict something if needed */
770 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:86
dlist_head changes
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
#define Assert(condition)
Definition: c.h:745
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snapshot,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 776 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), TeardownHistoricSnapshot(), and ReorderBufferTXNByIdEnt::txn.

Referenced by DecodeLogicalMsgOp().

780 {
781  if (transactional)
782  {
783  MemoryContext oldcontext;
784  ReorderBufferChange *change;
785 
787 
788  oldcontext = MemoryContextSwitchTo(rb->context);
789 
790  change = ReorderBufferGetChange(rb);
792  change->data.msg.prefix = pstrdup(prefix);
793  change->data.msg.message_size = message_size;
794  change->data.msg.message = palloc(message_size);
795  memcpy(change->data.msg.message, message, message_size);
796 
797  ReorderBufferQueueChange(rb, xid, lsn, change, false);
798 
799  MemoryContextSwitchTo(oldcontext);
800  }
801  else
802  {
803  ReorderBufferTXN *txn = NULL;
804  volatile Snapshot snapshot_now = snapshot;
805 
806  if (xid != InvalidTransactionId)
807  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
808 
809  /* setup snapshot to allow catalog access */
810  SetupHistoricSnapshot(snapshot_now, NULL);
811  PG_TRY();
812  {
813  rb->message(rb, txn, lsn, false, prefix, message_size, message);
814 
816  }
817  PG_CATCH();
818  {
820  PG_RE_THROW();
821  }
822  PG_END_TRY();
823  }
824 }
struct ReorderBufferChange::@99::@102 msg
char * pstrdup(const char *in)
Definition: mcxt.c:1187
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2074
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
#define InvalidTransactionId
Definition: transam.h:31
ReorderBufferMessageCB message
MemoryContext context
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
union ReorderBufferChange::@99 data
#define PG_CATCH()
Definition: elog.h:305
#define Assert(condition)
Definition: c.h:745
#define PG_RE_THROW()
Definition: elog.h:336
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:950
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2058
#define PG_TRY()
Definition: elog.h:295
#define PG_END_TRY()
Definition: elog.h:320

◆ ReorderBufferResetTXN()

static void ReorderBufferResetTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id,
XLogRecPtr  last_lsn,
ReorderBufferChange specinsert 
)
static

Definition at line 1876 of file reorderbuffer.c.

References ReorderBufferReturnChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), and ReorderBuffer::stream_stop.

Referenced by ReorderBufferProcessTXN().

1881 {
1882  /* Discard the changes that we just streamed */
1883  ReorderBufferTruncateTXN(rb, txn);
1884 
1885  /* Free all resources allocated for toast reconstruction */
1886  ReorderBufferToastReset(rb, txn);
1887 
1888  /* Return the spec insert change if it is not NULL */
1889  if (specinsert != NULL)
1890  {
1891  ReorderBufferReturnChange(rb, specinsert, true);
1892  specinsert = NULL;
1893  }
1894 
1895  /* Stop the stream. */
1896  rb->stream_stop(rb, txn, last_lsn);
1897 
1898  /* Remember the command ID and snapshot for the streaming run */
1899  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
1900 }
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
ReorderBufferStreamStopCB stream_stop

◆ ReorderBufferRestoreChange()

static void ReorderBufferRestoreChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
char *  change 
)
static

Definition at line 3733 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, ReorderBufferTXN::changes, ReorderBuffer::context, ReorderBufferChange::data, dlist_push_tail(), MemoryContextAlloc(), MemoryContextAllocZero(), ReorderBufferChange::msg, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, offsetof, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferGetChange(), ReorderBufferGetRelids(), ReorderBufferGetTupleBuf(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, ReorderBufferChange::snapshot, SnapshotData::subxcnt, ReorderBufferChange::tp, ReorderBufferChange::truncate, SnapshotData::xcnt, and SnapshotData::xip.

Referenced by ReorderBufferRestoreChanges().

3735 {
3736  ReorderBufferDiskChange *ondisk;
3737  ReorderBufferChange *change;
3738 
3739  ondisk = (ReorderBufferDiskChange *) data;
3740 
3741  change = ReorderBufferGetChange(rb);
3742 
3743  /* copy static part */
3744  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
3745 
3746  data += sizeof(ReorderBufferDiskChange);
3747 
3748  /* restore individual stuff */
3749  switch (change->action)
3750  {
3751  /* fall through these, they're all similar enough */
3756  if (change->data.tp.oldtuple)
3757  {
3758  uint32 tuplelen = ((HeapTuple) data)->t_len;
3759 
3760  change->data.tp.oldtuple =
3762 
3763  /* restore ->tuple */
3764  memcpy(&change->data.tp.oldtuple->tuple, data,
3765  sizeof(HeapTupleData));
3766  data += sizeof(HeapTupleData);
3767 
3768  /* reset t_data pointer into the new tuplebuf */
3769  change->data.tp.oldtuple->tuple.t_data =
3770  ReorderBufferTupleBufData(change->data.tp.oldtuple);
3771 
3772  /* restore tuple data itself */
3773  memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
3774  data += tuplelen;
3775  }
3776 
3777  if (change->data.tp.newtuple)
3778  {
3779  /* here, data might not be suitably aligned! */
3780  uint32 tuplelen;
3781 
3782  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
3783  sizeof(uint32));
3784 
3785  change->data.tp.newtuple =
3787 
3788  /* restore ->tuple */
3789  memcpy(&change->data.tp.newtuple->tuple, data,
3790  sizeof(HeapTupleData));
3791  data += sizeof(HeapTupleData);
3792 
3793  /* reset t_data pointer into the new tuplebuf */
3794  change->data.tp.newtuple->tuple.t_data =
3795  ReorderBufferTupleBufData(change->data.tp.newtuple);
3796 
3797  /* restore tuple data itself */
3798  memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
3799  data += tuplelen;
3800  }
3801 
3802  break;
3804  {
3805  Size prefix_size;
3806 
3807  /* read prefix */
3808  memcpy(&prefix_size, data, sizeof(Size));
3809  data += sizeof(Size);
3810  change->data.msg.prefix = MemoryContextAlloc(rb->context,
3811  prefix_size);
3812  memcpy(change->data.msg.prefix, data, prefix_size);
3813  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
3814  data += prefix_size;
3815 
3816  /* read the message */
3817  memcpy(&change->data.msg.message_size, data, sizeof(Size));
3818  data += sizeof(Size);
3819  change->data.msg.message = MemoryContextAlloc(rb->context,
3820  change->data.msg.message_size);
3821  memcpy(change->data.msg.message, data,
3822  change->data.msg.message_size);
3823  data += change->data.msg.message_size;
3824 
3825  break;
3826  }
3828  {
3829  Snapshot oldsnap;
3830  Snapshot newsnap;
3831  Size size;
3832 
3833  oldsnap = (Snapshot) data;
3834 
3835  size = sizeof(SnapshotData) +
3836  sizeof(TransactionId) * oldsnap->xcnt +
3837  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
3838 
3839  change->data.snapshot = MemoryContextAllocZero(rb->context, size);
3840 
3841  newsnap = change->data.snapshot;
3842 
3843  memcpy(newsnap, data, size);
3844  newsnap->xip = (TransactionId *)
3845  (((char *) newsnap) + sizeof(SnapshotData));
3846  newsnap->subxip = newsnap->xip + newsnap->xcnt;
3847  newsnap->copied = true;
3848  break;
3849  }
3850  /* the base struct contains all the data, easy peasy */
3852  {
3853  Oid *relids;
3854 
3855  relids = ReorderBufferGetRelids(rb,
3856  change->data.truncate.nrelids);
3857  memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
3858  change->data.truncate.relids = relids;
3859 
3860  break;
3861  }
3865  break;
3866  }
3867 
3868  dlist_push_tail(&txn->changes, &change->node);
3869  txn->nentries_mem++;
3870 
3871  /*
3872  * Update memory accounting for the restored change. We need to do this
3873  * although we don't check the memory limit when restoring the changes in
3874  * this branch (we only do that when initially queueing the changes after
3875  * decoding), because we will release the changes later, and that will
3876  * update the accounting too (subtracting the size from the counters). And
3877  * we don't want to underflow there.
3878  */
3879  ReorderBufferChangeMemoryUpdate(rb, change, true);
3880 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
HeapTupleData * HeapTuple
Definition: htup.h:71
struct ReorderBufferChange::@99::@101 truncate
uint32 TransactionId
Definition: c.h:520
struct ReorderBufferChange::@99::@102 msg
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
struct SnapshotData * Snapshot
Definition: snapshot.h:121
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:38
dlist_head changes
struct SnapshotData SnapshotData
unsigned int uint32
Definition: c.h:374
TransactionId * xip
Definition: snapshot.h:168
MemoryContext context
union ReorderBufferChange::@99 data
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:840
struct ReorderBufferDiskChange ReorderBufferDiskChange
struct ReorderBufferChange::@99::@100 tp
#define Assert(condition)
Definition: c.h:745
size_t Size
Definition: c.h:473
uint32 xcnt
Definition: snapshot.h:169
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797
Oid * ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
ReorderBufferChange change
struct HeapTupleData HeapTupleData
#define offsetof(type, field)
Definition: c.h:668
int32 subxcnt
Definition: snapshot.h:181
ReorderBufferTupleBuf * ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)

◆ ReorderBufferRestoreChanges()

static Size ReorderBufferRestoreChanges ( ReorderBuffer rb,
ReorderBufferTXN txn,
TXNEntryFile file,
XLogSegNo segno 
)
static

Definition at line 3592 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, cleanup(), dlist_mutable_iter::cur, TXNEntryFile::curOffset, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FileClose(), FileRead(), ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, max_changes_in_memory, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBuffer::outbuf, PathNameOpenFile(), PG_BINARY, ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferSerializedPath(), ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, TXNEntryFile::vfd, WAIT_EVENT_REORDER_BUFFER_READ, wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferIterTXNInit(), and ReorderBufferIterTXNNext().

3594 {
3595  Size restored = 0;
3596  XLogSegNo last_segno;
3597  dlist_mutable_iter cleanup_iter;
3598  File *fd = &file->vfd;
3599 
3602 
3603  /* free current entries, so we have memory for more */
3604  dlist_foreach_modify(cleanup_iter, &txn->changes)
3605  {
3607  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
3608 
3609  dlist_delete(&cleanup->node);
3610  ReorderBufferReturnChange(rb, cleanup, true);
3611  }
3612  txn->nentries_mem = 0;
3613  Assert(dlist_is_empty(&txn->changes));
3614 
3615  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
3616 
3617  while (restored < max_changes_in_memory && *segno <= last_segno)
3618  {
3619  int readBytes;
3620  ReorderBufferDiskChange *ondisk;
3621 
3622  if (*fd == -1)
3623  {
3624  char path[MAXPGPATH];
3625 
3626  /* first time in */
3627  if (*segno == 0)
3628  XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
3629 
3630  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
3631 
3632  /*
3633  * No need to care about TLIs here, only used during a single run,
3634  * so each LSN only maps to a specific WAL record.
3635  */
3637  *segno);
3638 
3639  *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
3640 
3641  /* No harm in resetting the offset even in case of failure */
3642  file->curOffset = 0;
3643 
3644  if (*fd < 0 && errno == ENOENT)
3645  {
3646  *fd = -1;
3647  (*segno)++;
3648  continue;
3649  }
3650  else if (*fd < 0)
3651  ereport(ERROR,
3653  errmsg("could not open file \"%s\": %m",
3654  path)));
3655  }
3656 
3657  /*
3658  * Read the statically sized part of a change which has information
3659  * about the total size. If we couldn't read a record, we're at the
3660  * end of this file.
3661  */
3663  readBytes = FileRead(file->vfd, rb->outbuf,
3664  sizeof(ReorderBufferDiskChange),
3666 
3667  /* eof */
3668  if (readBytes == 0)
3669  {
3670  FileClose(*fd);
3671  *fd = -1;
3672  (*segno)++;
3673  continue;
3674  }
3675  else if (readBytes < 0)
3676  ereport(ERROR,
3678  errmsg("could not read from reorderbuffer spill file: %m")));
3679  else if (readBytes != sizeof(ReorderBufferDiskChange))
3680  ereport(ERROR,
3682  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
3683  readBytes,
3684  (uint32) sizeof(ReorderBufferDiskChange))));
3685 
3686  file->curOffset += readBytes;
3687 
3688  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3689 
3691  sizeof(ReorderBufferDiskChange) + ondisk->size);
3692  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3693 
3694  readBytes = FileRead(file->vfd,
3695  rb->outbuf + sizeof(ReorderBufferDiskChange),
3696  ondisk->size - sizeof(ReorderBufferDiskChange),
3697  file->curOffset,
3699 
3700  if (readBytes < 0)
3701  ereport(ERROR,
3703  errmsg("could not read from reorderbuffer spill file: %m")));
3704  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
3705  ereport(ERROR,
3707  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
3708  readBytes,
3709  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
3710 
3711  file->curOffset += readBytes;
3712 
3713  /*
3714  * ok, read a full change from disk, now restore it into proper
3715  * in-memory format
3716  */
3717  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
3718  restored++;
3719  }
3720 
3721  return restored;
3722 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
dlist_node * cur
Definition: ilist.h:180
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1437
int wal_segment_size
Definition: xlog.c:117
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1211
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define ERROR
Definition: elog.h:43
dlist_head changes
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:41
int errcode_for_file_access(void)
Definition: elog.c:633
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
unsigned int uint32
Definition: c.h:374
XLogRecPtr final_lsn
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
static void cleanup(void)
Definition: bootstrap.c:862
TransactionId xid
#define ereport(elevel,...)
Definition: elog.h:144
struct ReorderBufferDiskChange ReorderBufferDiskChange
void FileClose(File file)
Definition: fd.c:1826
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:745
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
size_t Size
Definition: c.h:473
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
int errmsg(const char *fmt,...)
Definition: elog.c:824
static const Size max_changes_in_memory
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
int FileRead(File file, char *buffer, int amount, off_t offset, uint32 wait_event_info)
Definition: fd.c:1973
int File
Definition: fd.h:49
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ ReorderBufferRestoreCleanup()

static void ReorderBufferRestoreCleanup ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 3886 of file reorderbuffer.c.

References Assert, cur, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, MAXPGPATH, MyReplicationSlot, ReorderBufferSerializedPath(), wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferCleanupTXN(), and ReorderBufferTruncateTXN().

3887 {
3888  XLogSegNo first;
3889  XLogSegNo cur;
3890  XLogSegNo last;
3891 
3894 
3895  XLByteToSeg(txn->first_lsn, first, wal_segment_size);
3896  XLByteToSeg(txn->final_lsn, last, wal_segment_size);
3897 
3898  /* iterate over all possible filenames, and delete them */
3899  for (cur = first; cur <= last; cur++)
3900  {
3901  char path[MAXPGPATH];
3902 
3904  if (unlink(path) != 0 && errno != ENOENT)
3905  ereport(ERROR,
3907  errmsg("could not remove file \"%s\": %m", path)));
3908  }
3909 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int wal_segment_size
Definition: xlog.c:117
struct cursor * cur
Definition: ecpg.c:28
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:41
int errcode_for_file_access(void)
Definition: elog.c:633
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
XLogRecPtr final_lsn
TransactionId xid
#define ereport(elevel,...)
Definition: elog.h:144
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:745
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ ReorderBufferReturnChange()

void ReorderBufferReturnChange ( ReorderBuffer rb,
ReorderBufferChange change,
bool  upd_mem 
)

Definition at line 451 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::msg, pfree(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferFreeSnap(), ReorderBufferReturnRelids(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, ReorderBufferChange::tp, and ReorderBufferChange::truncate.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferProcessTXN(), ReorderBufferQueueChange(), ReorderBufferResetTXN(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferToastReset(), and ReorderBufferTruncateTXN().

453 {
454  /* update memory accounting info */
455  if (upd_mem)
456  ReorderBufferChangeMemoryUpdate(rb, change, false);
457 
458  /* free contained data */
459  switch (change->action)
460  {
465  if (change->data.tp.newtuple)
466  {
467  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
468  change->data.tp.newtuple = NULL;
469  }
470 
471  if (change->data.tp.oldtuple)
472  {
473  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
474  change->data.tp.oldtuple = NULL;
475  }
476  break;
478  if (change->data.msg.prefix != NULL)
479  pfree(change->data.msg.prefix);
480  change->data.msg.prefix = NULL;
481  if (change->data.msg.message != NULL)
482  pfree(change->data.msg.message);
483  change->data.msg.message = NULL;
484  break;
486  if (change->data.snapshot)
487  {
488  ReorderBufferFreeSnap(rb, change->data.snapshot);
489  change->data.snapshot = NULL;
490  }
491  break;
492  /* no data in addition to the struct itself */
494  if (change->data.truncate.relids != NULL)
495  {
496  ReorderBufferReturnRelids(rb, change->data.truncate.relids);
497  change->data.truncate.relids = NULL;
498  }
499  break;
503  break;
504  }
505 
506  pfree(change);
507 }
struct ReorderBufferChange::@99::@101 truncate
struct ReorderBufferChange::@99::@102 msg
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
void pfree(void *pointer)
Definition: mcxt.c:1057
void ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)
union ReorderBufferChange::@99 data
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
struct ReorderBufferChange::@99::@100 tp
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)

◆ ReorderBufferReturnRelids()

void ReorderBufferReturnRelids ( ReorderBuffer rb,
Oid relids 
)

Definition at line 566 of file reorderbuffer.c.

References pfree().

Referenced by ReorderBufferReturnChange().

567 {
568  pfree(relids);
569 }
void pfree(void *pointer)
Definition: mcxt.c:1057

◆ ReorderBufferReturnTupleBuf()

void ReorderBufferReturnTupleBuf ( ReorderBuffer rb,
ReorderBufferTupleBuf tuple 
)

Definition at line 535 of file reorderbuffer.c.

References pfree().

Referenced by ReorderBufferReturnChange().

536 {
537  pfree(tuple);
538 }
void pfree(void *pointer)
Definition: mcxt.c:1057

◆ ReorderBufferReturnTXN()

static void ReorderBufferReturnTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 406 of file reorderbuffer.c.

References ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, hash_destroy(), ReorderBufferTXN::invalidations, InvalidTransactionId, pfree(), ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::xid.

Referenced by ReorderBufferCleanupTXN().

407 {
408  /* clean the lookup cache if we were cached (quite likely) */
409  if (rb->by_txn_last_xid == txn->xid)
410  {
412  rb->by_txn_last_txn = NULL;
413  }
414 
415  /* free data that's contained */
416 
417  if (txn->tuplecid_hash != NULL)
418  {
420  txn->tuplecid_hash = NULL;
421  }
422 
423  if (txn->invalidations)
424  {
425  pfree(txn->invalidations);
426  txn->invalidations = NULL;
427  }
428 
429  pfree(txn);
430 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:827
TransactionId by_txn_last_xid
void pfree(void *pointer)
Definition: mcxt.c:1057
#define InvalidTransactionId
Definition: transam.h:31
ReorderBufferTXN * by_txn_last_txn
TransactionId xid
SharedInvalidationMessage * invalidations

◆ ReorderBufferSaveTXNSnapshot()

static void ReorderBufferSaveTXNSnapshot ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id 
)
inlinestatic

Definition at line 1857 of file reorderbuffer.c.

References ReorderBufferTXN::command_id, SnapshotData::copied, ReorderBufferCopySnap(), and ReorderBufferTXN::snapshot_now.

Referenced by ReorderBufferProcessTXN(), and ReorderBufferResetTXN().

1859 {
1860  txn->command_id = command_id;
1861 
1862  /* Avoid copying if it's already copied. */
1863  if (snapshot_now->copied)
1864  txn->snapshot_now = snapshot_now;
1865  else
1866  txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1867  txn, command_id);
1868 }
bool copied
Definition: snapshot.h:185
CommandId command_id
Snapshot snapshot_now
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)

◆ ReorderBufferSerializeChange()

static void ReorderBufferSerializeChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  fd,
ReorderBufferChange change 
)
static

Definition at line 3186 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, CloseTransientFile(), ReorderBufferChange::data, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferChange::lsn, ReorderBufferChange::msg, ReorderBuffer::outbuf, pgstat_report_wait_end(), pgstat_report_wait_start(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTupleBuf::tuple, WAIT_EVENT_REORDER_BUFFER_WRITE, write, SnapshotData::xcnt, ReorderBufferTXN::xid, and SnapshotData::xip.

Referenced by ReorderBufferSerializeTXN().

3188 {
3189  ReorderBufferDiskChange *ondisk;
3190  Size sz = sizeof(ReorderBufferDiskChange);
3191 
3193 
3194  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3195  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
3196 
3197  switch (change->action)
3198  {
3199  /* fall through these, they're all similar enough */
3204  {
3205  char *data;
3206  ReorderBufferTupleBuf *oldtup,
3207  *newtup;
3208  Size oldlen = 0;
3209  Size newlen = 0;
3210 
3211  oldtup = change->data.tp.oldtuple;
3212  newtup = change->data.tp.newtuple;
3213 
3214  if (oldtup)
3215  {
3216  sz += sizeof(HeapTupleData);
3217  oldlen = oldtup->tuple.t_len;
3218  sz += oldlen;
3219  }
3220 
3221  if (newtup)
3222  {
3223  sz += sizeof(HeapTupleData);
3224  newlen = newtup->tuple.t_len;
3225  sz += newlen;
3226  }
3227 
3228  /* make sure we have enough space */
3230 
3231  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3232  /* might have been reallocated above */
3233  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3234 
3235  if (oldlen)
3236  {
3237  memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
3238  data += sizeof(HeapTupleData);
3239 
3240  memcpy(data, oldtup->tuple.t_data, oldlen);
3241  data += oldlen;
3242  }
3243 
3244  if (newlen)
3245  {
3246  memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
3247  data += sizeof(HeapTupleData);
3248 
3249  memcpy(data, newtup->tuple.t_data, newlen);
3250  data += newlen;
3251  }
3252  break;
3253  }
3255  {
3256  char *data;
3257  Size prefix_size = strlen(change->data.msg.prefix) + 1;
3258 
3259  sz += prefix_size + change->data.msg.message_size +
3260  sizeof(Size) + sizeof(Size);
3262 
3263  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3264 
3265  /* might have been reallocated above */
3266  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3267 
3268  /* write the prefix including the size */
3269  memcpy(data, &prefix_size, sizeof(Size));
3270  data += sizeof(Size);
3271  memcpy(data, change->data.msg.prefix,
3272  prefix_size);
3273  data += prefix_size;
3274 
3275  /* write the message including the size */
3276  memcpy(data, &change->data.msg.message_size, sizeof(Size));
3277  data += sizeof(Size);
3278  memcpy(data, change->data.msg.message,
3279  change->data.msg.message_size);
3280  data += change->data.msg.message_size;
3281 
3282  break;
3283  }
3285  {
3286  Snapshot snap;
3287  char *data;
3288 
3289  snap = change->data.snapshot;
3290 
3291  sz += sizeof(SnapshotData) +
3292  sizeof(TransactionId) * snap->xcnt +
3293  sizeof(TransactionId) * snap->subxcnt;
3294 
3295  /* make sure we have enough space */
3297  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3298  /* might have been reallocated above */
3299  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3300 
3301  memcpy(data, snap, sizeof(SnapshotData));
3302  data += sizeof(SnapshotData);
3303 
3304  if (snap->xcnt)
3305  {
3306  memcpy(data, snap->xip,
3307  sizeof(TransactionId) * snap->xcnt);
3308  data += sizeof(TransactionId) * snap->xcnt;
3309  }
3310 
3311  if (snap->subxcnt)
3312  {
3313  memcpy(data, snap->subxip,
3314  sizeof(TransactionId) * snap->subxcnt);
3315  data += sizeof(TransactionId) * snap->subxcnt;
3316  }
3317  break;
3318  }
3320  {
3321  Size size;
3322  char *data;
3323 
3324  /* account for the OIDs of truncated relations */
3325  size = sizeof(Oid) * change->data.truncate.nrelids;
3326  sz += size;
3327 
3328  /* make sure we have enough space */
3330 
3331  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3332  /* might have been reallocated above */
3333  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3334 
3335  memcpy(data, change->data.truncate.relids, size);
3336  data += size;
3337 
3338  break;
3339  }
3340  case</