PostgreSQL Source Code  git master
reorderbuffer.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/detoast.h"
#include "access/heapam.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/combocid.h"
#include "utils/memdebug.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenumbermap.h"
Include dependency graph for reorderbuffer.c:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTXNByIdEnt
 
struct  ReorderBufferTupleCidKey
 
struct  ReorderBufferTupleCidEnt
 
struct  TXNEntryFile
 
struct  ReorderBufferIterTXNEntry
 
struct  ReorderBufferIterTXNState
 
struct  ReorderBufferToastEnt
 
struct  ReorderBufferDiskChange
 
struct  RewriteMappingFile
 

Macros

#define IsSpecInsert(action)
 
#define IsSpecConfirmOrAbort(action)
 
#define IsInsertOrUpdate(action)
 

Typedefs

typedef struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
 
typedef struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
 
typedef struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
 
typedef struct TXNEntryFile TXNEntryFile
 
typedef struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
 
typedef struct ReorderBufferIterTXNState ReorderBufferIterTXNState
 
typedef struct ReorderBufferToastEnt ReorderBufferToastEnt
 
typedef struct ReorderBufferDiskChange ReorderBufferDiskChange
 
typedef struct RewriteMappingFile RewriteMappingFile
 

Functions

static ReorderBufferTXNReorderBufferGetTXN (ReorderBuffer *rb)
 
static void ReorderBufferReturnTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferTXNReorderBufferTXNByXid (ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
 
static void ReorderBufferTransferSnapToParent (ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
 
static void AssertTXNLsnOrder (ReorderBuffer *rb)
 
static void ReorderBufferIterTXNInit (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
 
static ReorderBufferChangeReorderBufferIterTXNNext (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferIterTXNFinish (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferExecuteInvalidations (uint32 nmsgs, SharedInvalidationMessage *msgs)
 
static void ReorderBufferCheckMemoryLimit (ReorderBuffer *rb)
 
static void ReorderBufferSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeChange (ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
 
static Size ReorderBufferRestoreChanges (ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
 
static void ReorderBufferRestoreChange (ReorderBuffer *rb, ReorderBufferTXN *txn, char *data)
 
static void ReorderBufferRestoreCleanup (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferTruncateTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
 
static void ReorderBufferCleanupSerializedTXNs (const char *slotname)
 
static void ReorderBufferSerializedPath (char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
 
static void ReorderBufferFreeSnap (ReorderBuffer *rb, Snapshot snap)
 
static Snapshot ReorderBufferCopySnap (ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
 
static bool ReorderBufferCanStream (ReorderBuffer *rb)
 
static bool ReorderBufferCanStartStreaming (ReorderBuffer *rb)
 
static void ReorderBufferStreamTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferStreamCommit (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastInitHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReset (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReplace (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void ReorderBufferToastAppendChunk (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static Size ReorderBufferChangeSize (ReorderBufferChange *change)
 
static void ReorderBufferChangeMemoryUpdate (ReorderBuffer *rb, ReorderBufferChange *change, bool addition, Size sz)
 
ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *rb)
 
void ReorderBufferReturnChange (ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
 
ReorderBufferTupleBufReorderBufferGetTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
 
OidReorderBufferGetRelids (ReorderBuffer *rb, int nrelids)
 
void ReorderBufferReturnRelids (ReorderBuffer *rb, Oid *relids)
 
static void ReorderBufferProcessPartialChange (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
static void AssertChangeLsnOrder (ReorderBufferTXN *txn)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
TransactionId ReorderBufferGetOldestXmin (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
static int ReorderBufferIterCompare (Datum a, Datum b, void *arg)
 
static void ReorderBufferCleanupTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferBuildTupleCidHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void SetupCheckXidLive (TransactionId xid)
 
static void ReorderBufferApplyChange (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyTruncate (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyMessage (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferSaveTXNSnapshot (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
 
static void ReorderBufferResetTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
 
static void ReorderBufferProcessTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)
 
static void ReorderBufferReplay (ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
bool ReorderBufferRememberPrepareInfo (ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferSkipPrepare (ReorderBuffer *rb, TransactionId xid)
 
void ReorderBufferPrepare (ReorderBuffer *rb, TransactionId xid, char *gid)
 
void ReorderBufferFinishPrepared (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferInvalidate (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileLocator locator, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
TransactionIdReorderBufferGetCatalogChangesXacts (ReorderBuffer *rb)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
static void ReorderBufferSerializeReserve (ReorderBuffer *rb, Size sz)
 
static ReorderBufferTXNReorderBufferLargestTXN (ReorderBuffer *rb)
 
static ReorderBufferTXNReorderBufferLargestStreamableTopTXN (ReorderBuffer *rb)
 
void StartupReorderBuffer (void)
 
static void ApplyLogicalMappingFile (HTAB *tuplecid_data, Oid relid, const char *fname)
 
static bool TransactionIdInArray (TransactionId xid, TransactionId *xip, Size num)
 
static int file_sort_by_lsn (const ListCell *a_p, const ListCell *b_p)
 
static void UpdateLogicalMappings (HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
 
bool ResolveCminCmaxDuringDecoding (HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
 

Variables

int logical_decoding_work_mem
 
static const Size max_changes_in_memory = 4096
 

Macro Definition Documentation

◆ IsInsertOrUpdate

#define IsInsertOrUpdate (   action)
Value:

Definition at line 190 of file reorderbuffer.c.

◆ IsSpecConfirmOrAbort

#define IsSpecConfirmOrAbort (   action)
Value:
( \
)
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
Definition: reorderbuffer.h:65
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT
Definition: reorderbuffer.h:66

Definition at line 185 of file reorderbuffer.c.

◆ IsSpecInsert

#define IsSpecInsert (   action)
Value:

Definition at line 181 of file reorderbuffer.c.

Typedef Documentation

◆ ReorderBufferDiskChange

◆ ReorderBufferIterTXNEntry

◆ ReorderBufferIterTXNState

◆ ReorderBufferToastEnt

◆ ReorderBufferTupleCidEnt

◆ ReorderBufferTupleCidKey

◆ ReorderBufferTXNByIdEnt

◆ RewriteMappingFile

◆ TXNEntryFile

typedef struct TXNEntryFile TXNEntryFile

Function Documentation

◆ ApplyLogicalMappingFile()

static void ApplyLogicalMappingFile ( HTAB tuplecid_data,
Oid  relid,
const char *  fname 
)
static

Definition at line 4987 of file reorderbuffer.c.

4988 {
4989  char path[MAXPGPATH];
4990  int fd;
4991  int readBytes;
4993 
4994  sprintf(path, "pg_logical/mappings/%s", fname);
4995  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
4996  if (fd < 0)
4997  ereport(ERROR,
4999  errmsg("could not open file \"%s\": %m", path)));
5000 
5001  while (true)
5002  {
5005  ReorderBufferTupleCidEnt *new_ent;
5006  bool found;
5007 
5008  /* be careful about padding */
5009  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
5010 
5011  /* read all mappings till the end of the file */
5013  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
5015 
5016  if (readBytes < 0)
5017  ereport(ERROR,
5019  errmsg("could not read file \"%s\": %m",
5020  path)));
5021  else if (readBytes == 0) /* EOF */
5022  break;
5023  else if (readBytes != sizeof(LogicalRewriteMappingData))
5024  ereport(ERROR,
5026  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
5027  path, readBytes,
5028  (int32) sizeof(LogicalRewriteMappingData))));
5029 
5030  key.rlocator = map.old_locator;
5031  ItemPointerCopy(&map.old_tid,
5032  &key.tid);
5033 
5034 
5035  ent = (ReorderBufferTupleCidEnt *)
5037  (void *) &key,
5038  HASH_FIND,
5039  NULL);
5040 
5041  /* no existing mapping, no need to update */
5042  if (!ent)
5043  continue;
5044 
5045  key.rlocator = map.new_locator;
5046  ItemPointerCopy(&map.new_tid,
5047  &key.tid);
5048 
5049  new_ent = (ReorderBufferTupleCidEnt *)
5051  (void *) &key,
5052  HASH_ENTER,
5053  &found);
5054 
5055  if (found)
5056  {
5057  /*
5058  * Make sure the existing mapping makes sense. We sometime update
5059  * old records that did not yet have a cmax (e.g. pg_class' own
5060  * entry while rewriting it) during rewrites, so allow that.
5061  */
5062  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
5063  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
5064  }
5065  else
5066  {
5067  /* update mapping */
5068  new_ent->cmin = ent->cmin;
5069  new_ent->cmax = ent->cmax;
5070  new_ent->combocid = ent->combocid;
5071  }
5072  }
5073 
5074  if (CloseTransientFile(fd) != 0)
5075  ereport(ERROR,
5077  errmsg("could not close file \"%s\": %m", path)));
5078 }
#define InvalidCommandId
Definition: c.h:605
signed int int32
Definition: c.h:430
#define PG_BINARY
Definition: c.h:1209
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:953
int errcode_for_file_access(void)
Definition: elog.c:881
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
int CloseTransientFile(int fd)
Definition: fd.c:2609
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2433
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_ENTER
Definition: hsearch.h:114
#define read(a, b, c)
Definition: win32.h:13
static void ItemPointerCopy(const ItemPointerData *fromPointer, ItemPointerData *toPointer)
Definition: itemptr.h:172
Assert(fmt[strlen(fmt) - 1] !='\n')
#define MAXPGPATH
#define sprintf
Definition: port.h:240
static int fd(const char *x, int i)
Definition: preproc-init.c:105
static HTAB * tuplecid_data
Definition: snapmgr.c:117
ItemPointerData new_tid
Definition: rewriteheap.h:40
RelFileLocator old_locator
Definition: rewriteheap.h:37
ItemPointerData old_tid
Definition: rewriteheap.h:39
RelFileLocator new_locator
Definition: rewriteheap.h:38
@ WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ
Definition: wait_event.h:202
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:268
static void pgstat_report_wait_end(void)
Definition: wait_event.h:284

References Assert(), CloseTransientFile(), ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), HASH_ENTER, HASH_FIND, hash_search(), InvalidCommandId, ItemPointerCopy(), sort-test::key, MAXPGPATH, LogicalRewriteMappingData::new_locator, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_locator, LogicalRewriteMappingData::old_tid, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, sprintf, tuplecid_data, and WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ.

Referenced by UpdateLogicalMappings().

◆ AssertChangeLsnOrder()

static void AssertChangeLsnOrder ( ReorderBufferTXN txn)
static

Definition at line 974 of file reorderbuffer.c.

975 {
976 #ifdef USE_ASSERT_CHECKING
977  dlist_iter iter;
978  XLogRecPtr prev_lsn = txn->first_lsn;
979 
980  dlist_foreach(iter, &txn->changes)
981  {
982  ReorderBufferChange *cur_change;
983 
984  cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
985 
987  Assert(cur_change->lsn != InvalidXLogRecPtr);
988  Assert(txn->first_lsn <= cur_change->lsn);
989 
990  if (txn->end_lsn != InvalidXLogRecPtr)
991  Assert(cur_change->lsn <= txn->end_lsn);
992 
993  Assert(prev_lsn <= cur_change->lsn);
994 
995  prev_lsn = cur_change->lsn;
996  }
997 #endif
998 }
#define dlist_foreach(iter, lhead)
Definition: ilist.h:573
#define dlist_container(type, membername, ptr)
Definition: ilist.h:543
XLogRecPtr first_lsn
XLogRecPtr end_lsn
dlist_head changes
dlist_node * cur
Definition: ilist.h:179
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert(), ReorderBufferTXN::changes, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, and ReorderBufferChange::lsn.

Referenced by ReorderBufferIterTXNInit().

◆ AssertTXNLsnOrder()

static void AssertTXNLsnOrder ( ReorderBuffer rb)
static

Definition at line 903 of file reorderbuffer.c.

904 {
905 #ifdef USE_ASSERT_CHECKING
907  dlist_iter iter;
908  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
909  XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
910 
911  /*
912  * Skip the verification if we don't reach the LSN at which we start
913  * decoding the contents of transactions yet because until we reach the
914  * LSN, we could have transactions that don't have the association between
915  * the top-level transaction and subtransaction yet and consequently have
916  * the same LSN. We don't guarantee this association until we try to
917  * decode the actual contents of transaction. The ordering of the records
918  * prior to the start_decoding_at LSN should have been checked before the
919  * restart.
920  */
922  return;
923 
924  dlist_foreach(iter, &rb->toplevel_by_lsn)
925  {
927  iter.cur);
928 
929  /* start LSN must be set */
930  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
931 
932  /* If there is an end LSN, it must be higher than start LSN */
933  if (cur_txn->end_lsn != InvalidXLogRecPtr)
934  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
935 
936  /* Current initial LSN must be strictly higher than previous */
937  if (prev_first_lsn != InvalidXLogRecPtr)
938  Assert(prev_first_lsn < cur_txn->first_lsn);
939 
940  /* known-as-subtxn txns must not be listed */
941  Assert(!rbtxn_is_known_subxact(cur_txn));
942 
943  prev_first_lsn = cur_txn->first_lsn;
944  }
945 
947  {
949  base_snapshot_node,
950  iter.cur);
951 
952  /* base snapshot (and its LSN) must be set */
953  Assert(cur_txn->base_snapshot != NULL);
955 
956  /* current LSN must be strictly higher than previous */
957  if (prev_base_snap_lsn != InvalidXLogRecPtr)
958  Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
959 
960  /* known-as-subtxn txns must not be listed */
961  Assert(!rbtxn_is_known_subxact(cur_txn));
962 
963  prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
964  }
965 #endif
966 }
#define rbtxn_is_known_subxact(txn)
bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
Definition: snapbuild.c:429
XLogReaderState * reader
Definition: logical.h:42
struct SnapBuild * snapshot_builder
Definition: logical.h:44
XLogRecPtr base_snapshot_lsn
Snapshot base_snapshot
dlist_head txns_by_base_snapshot_lsn
dlist_head toplevel_by_lsn
void * private_data
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, XLogReaderState::EndRecPtr, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, ReorderBuffer::private_data, rbtxn_is_known_subxact, LogicalDecodingContext::reader, SnapBuildXactNeedsSkip(), LogicalDecodingContext::snapshot_builder, ReorderBuffer::toplevel_by_lsn, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferAssignChild(), ReorderBufferGetOldestTXN(), ReorderBufferGetOldestXmin(), ReorderBufferSetBaseSnapshot(), and ReorderBufferTXNByXid().

◆ file_sort_by_lsn()

static int file_sort_by_lsn ( const ListCell a_p,
const ListCell b_p 
)
static

Definition at line 5095 of file reorderbuffer.c.

5096 {
5099 
5100  if (a->lsn < b->lsn)
5101  return -1;
5102  else if (a->lsn > b->lsn)
5103  return 1;
5104  return 0;
5105 }
int b
Definition: isn.c:70
int a
Definition: isn.c:69
#define lfirst(lc)
Definition: pg_list.h:170

References a, b, and lfirst.

Referenced by UpdateLogicalMappings().

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2873 of file reorderbuffer.c.

2874 {
2875  ReorderBufferTXN *txn;
2876 
2877  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2878  false);
2879 
2880  /* unknown, nothing to remove */
2881  if (txn == NULL)
2882  return;
2883 
2884  /* For streamed transactions notify the remote node about the abort. */
2885  if (rbtxn_is_streamed(txn))
2886  {
2887  rb->stream_abort(rb, txn, lsn);
2888 
2889  /*
2890  * We might have decoded changes for this transaction that could load
2891  * the cache as per the current transaction's view (consider DDL's
2892  * happened in this transaction). We don't want the decoding of future
2893  * transactions to use those cache entries so execute invalidations.
2894  */
2895  if (txn->ninvalidations > 0)
2897  txn->invalidations);
2898  }
2899 
2900  /* cosmetic... */
2901  txn->final_lsn = lsn;
2902 
2903  /* remove potential on-disk data, and deallocate */
2904  ReorderBufferCleanupTXN(rb, txn);
2905 }
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_streamed(txn)
SharedInvalidationMessage * invalidations
XLogRecPtr final_lsn
ReorderBufferStreamAbortCB stream_abort

References ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), and ReorderBuffer::stream_abort.

Referenced by DecodeAbort().

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)

Definition at line 2915 of file reorderbuffer.c.

2916 {
2917  dlist_mutable_iter it;
2918 
2919  /*
2920  * Iterate through all (potential) toplevel TXNs and abort all that are
2921  * older than what possibly can be running. Once we've found the first
2922  * that is alive we stop, there might be some that acquired an xid earlier
2923  * but started writing later, but it's unlikely and they will be cleaned
2924  * up in a later call to this function.
2925  */
2927  {
2928  ReorderBufferTXN *txn;
2929 
2930  txn = dlist_container(ReorderBufferTXN, node, it.cur);
2931 
2932  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2933  {
2934  elog(DEBUG2, "aborting old transaction %u", txn->xid);
2935 
2936  /* remove potential on-disk data, and deallocate this tx */
2937  ReorderBufferCleanupTXN(rb, txn);
2938  }
2939  else
2940  return;
2941  }
2942 }
#define DEBUG2
Definition: elog.h:29
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:590
TransactionId xid
dlist_node * cur
Definition: ilist.h:200
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:273

References dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, elog(), ReorderBufferCleanupTXN(), ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), and ReorderBufferTXN::xid.

Referenced by standby_decode().

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 3247 of file reorderbuffer.c.

3250 {
3251  ReorderBufferTXN *txn;
3252  MemoryContext oldcontext;
3253  ReorderBufferChange *change;
3254 
3255  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3256 
3257  oldcontext = MemoryContextSwitchTo(rb->context);
3258 
3259  /*
3260  * Collect all the invalidations under the top transaction, if available,
3261  * so that we can execute them all together. See comments atop this
3262  * function.
3263  */
3264  if (txn->toptxn)
3265  txn = txn->toptxn;
3266 
3267  Assert(nmsgs > 0);
3268 
3269  /* Accumulate invalidations. */
3270  if (txn->ninvalidations == 0)
3271  {
3272  txn->ninvalidations = nmsgs;
3274  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3275  memcpy(txn->invalidations, msgs,
3276  sizeof(SharedInvalidationMessage) * nmsgs);
3277  }
3278  else
3279  {
3282  (txn->ninvalidations + nmsgs));
3283 
3284  memcpy(txn->invalidations + txn->ninvalidations, msgs,
3285  nmsgs * sizeof(SharedInvalidationMessage));
3286  txn->ninvalidations += nmsgs;
3287  }
3288 
3289  change = ReorderBufferGetChange(rb);
3291  change->data.inval.ninvalidations = nmsgs;
3292  change->data.inval.invalidations = (SharedInvalidationMessage *)
3293  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3294  memcpy(change->data.inval.invalidations, msgs,
3295  sizeof(SharedInvalidationMessage) * nmsgs);
3296 
3297  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3298 
3299  MemoryContextSwitchTo(oldcontext);
3300 }
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1321
void * palloc(Size size)
Definition: mcxt.c:1199
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:135
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
@ REORDER_BUFFER_CHANGE_INVALIDATION
Definition: reorderbuffer.h:60
ReorderBufferChangeType action
Definition: reorderbuffer.h:85
union ReorderBufferChange::@97 data
struct ReorderBufferChange::@97::@102 inval
struct ReorderBufferTXN * toptxn
MemoryContext context

References ReorderBufferChange::action, Assert(), ReorderBuffer::context, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferTXN::invalidations, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, palloc(), REORDER_BUFFER_CHANGE_INVALIDATION, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), repalloc(), and ReorderBufferTXN::toptxn.

Referenced by xact_decode().

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileLocator  locator,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 3210 of file reorderbuffer.c.

3214 {
3216  ReorderBufferTXN *txn;
3217 
3218  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3219 
3220  change->data.tuplecid.locator = locator;
3221  change->data.tuplecid.tid = tid;
3222  change->data.tuplecid.cmin = cmin;
3223  change->data.tuplecid.cmax = cmax;
3224  change->data.tuplecid.combocid = combocid;
3225  change->lsn = lsn;
3226  change->txn = txn;
3228 
3229  dlist_push_tail(&txn->tuplecids, &change->node);
3230  txn->ntuplecids++;
3231 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:353
@ REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
Definition: reorderbuffer.h:63
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:88
struct ReorderBufferChange::@97::@101 tuplecid
dlist_head tuplecids

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, and ReorderBufferChange::txn.

Referenced by SnapBuildProcessNewCid().

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

◆ ReorderBufferAllocate()

ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 301 of file reorderbuffer.c.

302 {
303  ReorderBuffer *buffer;
304  HASHCTL hash_ctl;
305  MemoryContext new_ctx;
306 
307  Assert(MyReplicationSlot != NULL);
308 
309  /* allocate memory in own context, to have better accountability */
311  "ReorderBuffer",
313 
314  buffer =
315  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
316 
317  memset(&hash_ctl, 0, sizeof(hash_ctl));
318 
319  buffer->context = new_ctx;
320 
321  buffer->change_context = SlabContextCreate(new_ctx,
322  "Change",
324  sizeof(ReorderBufferChange));
325 
326  buffer->txn_context = SlabContextCreate(new_ctx,
327  "TXN",
329  sizeof(ReorderBufferTXN));
330 
331  /*
332  * XXX the allocation sizes used below pre-date generation context's block
333  * growing code. These values should likely be benchmarked and set to
334  * more suitable values.
335  */
336  buffer->tup_context = GenerationContextCreate(new_ctx,
337  "Tuples",
341 
342  hash_ctl.keysize = sizeof(TransactionId);
343  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
344  hash_ctl.hcxt = buffer->context;
345 
346  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
348 
350  buffer->by_txn_last_txn = NULL;
351 
352  buffer->outbuf = NULL;
353  buffer->outbufsize = 0;
354  buffer->size = 0;
355 
356  buffer->spillTxns = 0;
357  buffer->spillCount = 0;
358  buffer->spillBytes = 0;
359  buffer->streamTxns = 0;
360  buffer->streamCount = 0;
361  buffer->streamBytes = 0;
362  buffer->totalTxns = 0;
363  buffer->totalBytes = 0;
364 
366 
367  dlist_init(&buffer->toplevel_by_lsn);
369  dclist_init(&buffer->catchange_txns);
370 
371  /*
372  * Ensure there's no stale data from prior uses of this slot, in case some
373  * prior exit avoided calling ReorderBufferFree. Failure to do this can
374  * produce duplicated txns, and it's very cheap if there's nothing there.
375  */
377 
378  return buffer;
379 }
#define NameStr(name)
Definition: c.h:682
uint32 TransactionId
Definition: c.h:588
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:350
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: generation.c:158
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
static void dclist_init(dclist_head *head)
Definition: ilist.h:621
MemoryContext CurrentMemoryContext
Definition: mcxt.c:124
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:994
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:182
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:183
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:141
ReplicationSlot * MyReplicationSlot
Definition: slot.c:98
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
MemoryContext hcxt
Definition: hsearch.h:86
dclist_head catchange_txns
MemoryContext change_context
ReorderBufferTXN * by_txn_last_txn
TransactionId by_txn_last_xid
MemoryContext tup_context
MemoryContext txn_context
XLogRecPtr current_restart_decoding_lsn
ReplicationSlotPersistentData data
Definition: slot.h:147
#define InvalidTransactionId
Definition: transam.h:31

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert(), ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::catchange_txns, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dclist_init(), dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, ReorderBufferCleanupSerializedTXNs(), ReorderBuffer::size, SLAB_DEFAULT_BLOCK_SIZE, SLAB_LARGE_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBuffer::toplevel_by_lsn, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBuffer::tup_context, ReorderBuffer::txn_context, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

◆ ReorderBufferApplyChange()

static void ReorderBufferApplyChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1959 of file reorderbuffer.c.

1962 {
1963  if (streaming)
1964  rb->stream_change(rb, txn, relation, change);
1965  else
1966  rb->apply_change(rb, txn, relation, change);
1967 }
ReorderBufferStreamChangeCB stream_change
ReorderBufferApplyChangeCB apply_change

References ReorderBuffer::apply_change, and ReorderBuffer::stream_change.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferApplyMessage()

static void ReorderBufferApplyMessage ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1987 of file reorderbuffer.c.

1989 {
1990  if (streaming)
1991  rb->stream_message(rb, txn, change->lsn, true,
1992  change->data.msg.prefix,
1993  change->data.msg.message_size,
1994  change->data.msg.message);
1995  else
1996  rb->message(rb, txn, change->lsn, true,
1997  change->data.msg.prefix,
1998  change->data.msg.message_size,
1999  change->data.msg.message);
2000 }
struct ReorderBufferChange::@97::@100 msg
ReorderBufferStreamMessageCB stream_message
ReorderBufferMessageCB message

References ReorderBufferChange::data, ReorderBufferChange::lsn, ReorderBuffer::message, ReorderBufferChange::msg, and ReorderBuffer::stream_message.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferApplyTruncate()

static void ReorderBufferApplyTruncate ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  nrelations,
Relation relations,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1973 of file reorderbuffer.c.

1976 {
1977  if (streaming)
1978  rb->stream_truncate(rb, txn, nrelations, relations, change);
1979  else
1980  rb->apply_truncate(rb, txn, nrelations, relations, change);
1981 }
ReorderBufferStreamTruncateCB stream_truncate
ReorderBufferApplyTruncateCB apply_truncate

References ReorderBuffer::apply_truncate, and ReorderBuffer::stream_truncate.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 1060 of file reorderbuffer.c.

1062 {
1063  ReorderBufferTXN *txn;
1064  ReorderBufferTXN *subtxn;
1065  bool new_top;
1066  bool new_sub;
1067 
1068  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1069  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1070 
1071  if (!new_sub)
1072  {
1073  if (rbtxn_is_known_subxact(subtxn))
1074  {
1075  /* already associated, nothing to do */
1076  return;
1077  }
1078  else
1079  {
1080  /*
1081  * We already saw this transaction, but initially added it to the
1082  * list of top-level txns. Now that we know it's not top-level,
1083  * remove it from there.
1084  */
1085  dlist_delete(&subtxn->node);
1086  }
1087  }
1088 
1089  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1090  subtxn->toplevel_xid = xid;
1091  Assert(subtxn->nsubtxns == 0);
1092 
1093  /* set the reference to top-level transaction */
1094  subtxn->toptxn = txn;
1095 
1096  /* add to subtransaction list */
1097  dlist_push_tail(&txn->subtxns, &subtxn->node);
1098  txn->nsubtxns++;
1099 
1100  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1101  ReorderBufferTransferSnapToParent(txn, subtxn);
1102 
1103  /* Verify LSN-ordering invariant */
1104  AssertTXNLsnOrder(rb);
1105 }
static void dlist_delete(dlist_node *node)
Definition: ilist.h:394
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
#define RBTXN_IS_SUBXACT
TransactionId toplevel_xid
dlist_head subtxns

References Assert(), AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SUBXACT, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, ReorderBufferTXN::toplevel_xid, ReorderBufferTXN::toptxn, and ReorderBufferTXN::txn_flags.

Referenced by LogicalDecodingProcessRecord(), and ReorderBufferCommitChild().

◆ ReorderBufferBuildTupleCidHash()

static void ReorderBufferBuildTupleCidHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1722 of file reorderbuffer.c.

1723 {
1724  dlist_iter iter;
1725  HASHCTL hash_ctl;
1726 
1728  return;
1729 
1730  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1731  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1732  hash_ctl.hcxt = rb->context;
1733 
1734  /*
1735  * create the hash with the exact number of to-be-stored tuplecids from
1736  * the start
1737  */
1738  txn->tuplecid_hash =
1739  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1741 
1742  dlist_foreach(iter, &txn->tuplecids)
1743  {
1746  bool found;
1747  ReorderBufferChange *change;
1748 
1749  change = dlist_container(ReorderBufferChange, node, iter.cur);
1750 
1752 
1753  /* be careful about padding */
1754  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1755 
1756  key.rlocator = change->data.tuplecid.locator;
1757 
1758  ItemPointerCopy(&change->data.tuplecid.tid,
1759  &key.tid);
1760 
1761  ent = (ReorderBufferTupleCidEnt *)
1763  (void *) &key,
1764  HASH_ENTER,
1765  &found);
1766  if (!found)
1767  {
1768  ent->cmin = change->data.tuplecid.cmin;
1769  ent->cmax = change->data.tuplecid.cmax;
1770  ent->combocid = change->data.tuplecid.combocid;
1771  }
1772  else
1773  {
1774  /*
1775  * Maybe we already saw this tuple before in this transaction, but
1776  * if so it must have the same cmin.
1777  */
1778  Assert(ent->cmin == change->data.tuplecid.cmin);
1779 
1780  /*
1781  * cmax may be initially invalid, but once set it can only grow,
1782  * and never become invalid again.
1783  */
1784  Assert((ent->cmax == InvalidCommandId) ||
1785  ((change->data.tuplecid.cmax != InvalidCommandId) &&
1786  (change->data.tuplecid.cmax > ent->cmax)));
1787  ent->cmax = change->data.tuplecid.cmax;
1788  }
1789  }
1790 }
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:325
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
#define rbtxn_has_catalog_changes(txn)

References ReorderBufferChange::action, Assert(), ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, dlist_container, dlist_foreach, dlist_is_empty(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), HASHCTL::hcxt, InvalidCommandId, ItemPointerCopy(), sort-test::key, HASHCTL::keysize, ReorderBufferTXN::ntuplecids, rbtxn_has_catalog_changes, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::tuplecids.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferCanStartStreaming()

static bool ReorderBufferCanStartStreaming ( ReorderBuffer rb)
inlinestatic

Definition at line 3932 of file reorderbuffer.c.

3933 {
3935  SnapBuild *builder = ctx->snapshot_builder;
3936 
3937  /* We can't start streaming unless a consistent state is reached. */
3939  return false;
3940 
3941  /*
3942  * We can't start streaming immediately even if the streaming is enabled
3943  * because we previously decoded this transaction and now just are
3944  * restarting.
3945  */
3946  if (ReorderBufferCanStream(rb) &&
3947  !SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr))
3948  return true;
3949 
3950  return false;
3951 }
static bool ReorderBufferCanStream(ReorderBuffer *rb)
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
Definition: snapbuild.c:402
@ SNAPBUILD_CONSISTENT
Definition: snapbuild.h:46
XLogRecPtr ReadRecPtr
Definition: xlogreader.h:206

References ReorderBuffer::private_data, LogicalDecodingContext::reader, XLogReaderState::ReadRecPtr, ReorderBufferCanStream(), SNAPBUILD_CONSISTENT, SnapBuildCurrentState(), SnapBuildXactNeedsSkip(), and LogicalDecodingContext::snapshot_builder.

Referenced by ReorderBufferCheckMemoryLimit(), and ReorderBufferProcessPartialChange().

◆ ReorderBufferCanStream()

static bool ReorderBufferCanStream ( ReorderBuffer rb)
inlinestatic

Definition at line 3923 of file reorderbuffer.c.

3924 {
3926 
3927  return ctx->streaming;
3928 }

References ReorderBuffer::private_data, and LogicalDecodingContext::streaming.

Referenced by ReorderBufferCanStartStreaming(), and ReorderBufferProcessPartialChange().

◆ ReorderBufferChangeMemoryUpdate()

static void ReorderBufferChangeMemoryUpdate ( ReorderBuffer rb,
ReorderBufferChange change,
bool  addition,
Size  sz 
)
static

Definition at line 3153 of file reorderbuffer.c.

3156 {
3157  ReorderBufferTXN *txn;
3158  ReorderBufferTXN *toptxn;
3159 
3160  Assert(change->txn);
3161 
3162  /*
3163  * Ignore tuple CID changes, because those are not evicted when reaching
3164  * memory limit. So we just don't count them, because it might easily
3165  * trigger a pointless attempt to spill.
3166  */
3168  return;
3169 
3170  txn = change->txn;
3171 
3172  /*
3173  * Update the total size in top level as well. This is later used to
3174  * compute the decoding stats.
3175  */
3176  if (txn->toptxn != NULL)
3177  toptxn = txn->toptxn;
3178  else
3179  toptxn = txn;
3180 
3181  if (addition)
3182  {
3183  txn->size += sz;
3184  rb->size += sz;
3185 
3186  /* Update the total size in the top transaction. */
3187  toptxn->total_size += sz;
3188  }
3189  else
3190  {
3191  Assert((rb->size >= sz) && (txn->size >= sz));
3192  txn->size -= sz;
3193  rb->size -= sz;
3194 
3195  /* Update the total size in the top transaction. */
3196  toptxn->total_size -= sz;
3197  }
3198 
3199  Assert(txn->size <= rb->size);
3200 }

References ReorderBufferChange::action, Assert(), REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferTXN::size, ReorderBuffer::size, ReorderBufferTXN::toptxn, ReorderBufferTXN::total_size, and ReorderBufferChange::txn.

Referenced by ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferReturnChange(), and ReorderBufferToastReplace().

◆ ReorderBufferChangeSize()

static Size ReorderBufferChangeSize ( ReorderBufferChange change)
static

Definition at line 4075 of file reorderbuffer.c.

4076 {
4077  Size sz = sizeof(ReorderBufferChange);
4078 
4079  switch (change->action)
4080  {
4081  /* fall through these, they're all similar enough */
4086  {
4087  ReorderBufferTupleBuf *oldtup,
4088  *newtup;
4089  Size oldlen = 0;
4090  Size newlen = 0;
4091 
4092  oldtup = change->data.tp.oldtuple;
4093  newtup = change->data.tp.newtuple;
4094 
4095  if (oldtup)
4096  {
4097  sz += sizeof(HeapTupleData);
4098  oldlen = oldtup->tuple.t_len;
4099  sz += oldlen;
4100  }
4101 
4102  if (newtup)
4103  {
4104  sz += sizeof(HeapTupleData);
4105  newlen = newtup->tuple.t_len;
4106  sz += newlen;
4107  }
4108 
4109  break;
4110  }
4112  {
4113  Size prefix_size = strlen(change->data.msg.prefix) + 1;
4114 
4115  sz += prefix_size + change->data.msg.message_size +
4116  sizeof(Size) + sizeof(Size);
4117 
4118  break;
4119  }
4121  {
4122  sz += sizeof(SharedInvalidationMessage) *
4123  change->data.inval.ninvalidations;
4124  break;
4125  }
4127  {
4128  Snapshot snap;
4129 
4130  snap = change->data.snapshot;
4131 
4132  sz += sizeof(SnapshotData) +
4133  sizeof(TransactionId) * snap->xcnt +
4134  sizeof(TransactionId) * snap->subxcnt;
4135 
4136  break;
4137  }
4139  {
4140  sz += sizeof(Oid) * change->data.truncate.nrelids;
4141 
4142  break;
4143  }
4148  /* ReorderBufferChange contains everything important */
4149  break;
4150  }
4151 
4152  return sz;
4153 }
size_t Size
Definition: c.h:541
struct HeapTupleData HeapTupleData
unsigned int Oid
Definition: postgres_ext.h:31
struct ReorderBufferChange ReorderBufferChange
@ REORDER_BUFFER_CHANGE_MESSAGE
Definition: reorderbuffer.h:59
@ REORDER_BUFFER_CHANGE_TRUNCATE
Definition: reorderbuffer.h:67
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:58
struct SnapshotData SnapshotData
uint32 t_len
Definition: htup.h:64
struct ReorderBufferChange::@97::@99 truncate
struct ReorderBufferChange::@97::@98 tp
HeapTupleData tuple
Definition: reorderbuffer.h:29
int32 subxcnt
Definition: snapshot.h:181
uint32 xcnt
Definition: snapshot.h:169

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::msg, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::snapshot, SnapshotData::subxcnt, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTupleBuf::tuple, and SnapshotData::xcnt.

Referenced by ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferReturnChange(), and ReorderBufferToastReplace().

◆ ReorderBufferCheckMemoryLimit()

static void ReorderBufferCheckMemoryLimit ( ReorderBuffer rb)
static

Definition at line 3551 of file reorderbuffer.c.

3552 {
3553  ReorderBufferTXN *txn;
3554 
3555  /* bail out if we haven't exceeded the memory limit */
3556  if (rb->size < logical_decoding_work_mem * 1024L)
3557  return;
3558 
3559  /*
3560  * Loop until we reach under the memory limit. One might think that just
3561  * by evicting the largest (sub)transaction we will come under the memory
3562  * limit based on assumption that the selected transaction is at least as
3563  * large as the most recent change (which caused us to go over the memory
3564  * limit). However, that is not true because a user can reduce the
3565  * logical_decoding_work_mem to a smaller value before the most recent
3566  * change.
3567  */
3568  while (rb->size >= logical_decoding_work_mem * 1024L)
3569  {
3570  /*
3571  * Pick the largest transaction (or subtransaction) and evict it from
3572  * memory by streaming, if possible. Otherwise, spill to disk.
3573  */
3575  (txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
3576  {
3577  /* we know there has to be one, because the size is not zero */
3578  Assert(txn && !txn->toptxn);
3579  Assert(txn->total_size > 0);
3580  Assert(rb->size >= txn->total_size);
3581 
3582  ReorderBufferStreamTXN(rb, txn);
3583  }
3584  else
3585  {
3586  /*
3587  * Pick the largest transaction (or subtransaction) and evict it
3588  * from memory by serializing it to disk.
3589  */
3590  txn = ReorderBufferLargestTXN(rb);
3591 
3592  /* we know there has to be one, because the size is not zero */
3593  Assert(txn);
3594  Assert(txn->size > 0);
3595  Assert(rb->size >= txn->size);
3596 
3597  ReorderBufferSerializeTXN(rb, txn);
3598  }
3599 
3600  /*
3601  * After eviction, the transaction should have no entries in memory,
3602  * and should use 0 bytes for changes.
3603  */
3604  Assert(txn->size == 0);
3605  Assert(txn->nentries_mem == 0);
3606  }
3607 
3608  /* We must be under the memory limit now. */
3609  Assert(rb->size < logical_decoding_work_mem * 1024L);
3610 }
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)
static ReorderBufferTXN * ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
int logical_decoding_work_mem
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)

References Assert(), logical_decoding_work_mem, ReorderBufferTXN::nentries_mem, ReorderBufferCanStartStreaming(), ReorderBufferLargestStreamableTopTXN(), ReorderBufferLargestTXN(), ReorderBufferSerializeTXN(), ReorderBufferStreamTXN(), ReorderBufferTXN::size, ReorderBuffer::size, ReorderBufferTXN::toptxn, and ReorderBufferTXN::total_size.

Referenced by ReorderBufferQueueChange().

◆ ReorderBufferCleanupSerializedTXNs()

static void ReorderBufferCleanupSerializedTXNs ( const char *  slotname)
static

Definition at line 4501 of file reorderbuffer.c.

4502 {
4503  DIR *spill_dir;
4504  struct dirent *spill_de;
4505  struct stat statbuf;
4506  char path[MAXPGPATH * 2 + 12];
4507 
4508  sprintf(path, "pg_replslot/%s", slotname);
4509 
4510  /* we're only handling directories here, skip if it's not ours */
4511  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
4512  return;
4513 
4514  spill_dir = AllocateDir(path);
4515  while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
4516  {
4517  /* only look at names that can be ours */
4518  if (strncmp(spill_de->d_name, "xid", 3) == 0)
4519  {
4520  snprintf(path, sizeof(path),
4521  "pg_replslot/%s/%s", slotname,
4522  spill_de->d_name);
4523 
4524  if (unlink(path) != 0)
4525  ereport(ERROR,
4527  errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
4528  path, slotname)));
4529  }
4530  }
4531  FreeDir(spill_dir);
4532 }
#define INFO
Definition: elog.h:34
int FreeDir(DIR *dir)
Definition: fd.c:2761
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2724
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2643
#define snprintf
Definition: port.h:238
Definition: dirent.c:26
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15
#define lstat(path, sb)
Definition: win32_port.h:287
#define S_ISDIR(m)
Definition: win32_port.h:327

References AllocateDir(), dirent::d_name, ereport, errcode_for_file_access(), errmsg(), ERROR, FreeDir(), INFO, lstat, MAXPGPATH, ReadDirExtended(), S_ISDIR, snprintf, sprintf, and stat::st_mode.

Referenced by ReorderBufferAllocate(), ReorderBufferFree(), and StartupReorderBuffer().

◆ ReorderBufferCleanupTXN()

static void ReorderBufferCleanupTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1496 of file reorderbuffer.c.

1497 {
1498  bool found;
1499  dlist_mutable_iter iter;
1500 
1501  /* cleanup subtransactions & their changes */
1502  dlist_foreach_modify(iter, &txn->subtxns)
1503  {
1504  ReorderBufferTXN *subtxn;
1505 
1506  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1507 
1508  /*
1509  * Subtransactions are always associated to the toplevel TXN, even if
1510  * they originally were happening inside another subtxn, so we won't
1511  * ever recurse more than one level deep here.
1512  */
1513  Assert(rbtxn_is_known_subxact(subtxn));
1514  Assert(subtxn->nsubtxns == 0);
1515 
1516  ReorderBufferCleanupTXN(rb, subtxn);
1517  }
1518 
1519  /* cleanup changes in the txn */
1520  dlist_foreach_modify(iter, &txn->changes)
1521  {
1522  ReorderBufferChange *change;
1523 
1524  change = dlist_container(ReorderBufferChange, node, iter.cur);
1525 
1526  /* Check we're not mixing changes from different transactions. */
1527  Assert(change->txn == txn);
1528 
1529  ReorderBufferReturnChange(rb, change, true);
1530  }
1531 
1532  /*
1533  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1534  * They are always stored in the toplevel transaction.
1535  */
1536  dlist_foreach_modify(iter, &txn->tuplecids)
1537  {
1538  ReorderBufferChange *change;
1539 
1540  change = dlist_container(ReorderBufferChange, node, iter.cur);
1541 
1542  /* Check we're not mixing changes from different transactions. */
1543  Assert(change->txn == txn);
1545 
1546  ReorderBufferReturnChange(rb, change, true);
1547  }
1548 
1549  /*
1550  * Cleanup the base snapshot, if set.
1551  */
1552  if (txn->base_snapshot != NULL)
1553  {
1556  }
1557 
1558  /*
1559  * Cleanup the snapshot for the last streamed run.
1560  */
1561  if (txn->snapshot_now != NULL)
1562  {
1563  Assert(rbtxn_is_streamed(txn));
1565  }
1566 
1567  /*
1568  * Remove TXN from its containing lists.
1569  *
1570  * Note: if txn is known as subxact, we are deleting the TXN from its
1571  * parent's list of known subxacts; this leaves the parent's nsubxacts
1572  * count too high, but we don't care. Otherwise, we are deleting the TXN
1573  * from the LSN-ordered list of toplevel TXNs. We remove the TXN from the
1574  * list of catalog modifying transactions as well.
1575  */
1576  dlist_delete(&txn->node);
1577  if (rbtxn_has_catalog_changes(txn))
1579 
1580  /* now remove reference from buffer */
1581  hash_search(rb->by_txn,
1582  (void *) &txn->xid,
1583  HASH_REMOVE,
1584  &found);
1585  Assert(found);
1586 
1587  /* remove entries spilled to disk */
1588  if (rbtxn_is_serialized(txn))
1589  ReorderBufferRestoreCleanup(rb, txn);
1590 
1591  /* deallocate */
1592  ReorderBufferReturnTXN(rb, txn);
1593 }
@ HASH_REMOVE
Definition: hsearch.h:115
static void dclist_delete_from(dclist_head *head, dlist_node *node)
Definition: ilist.h:713
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
#define rbtxn_is_serialized(txn)
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:453
Snapshot snapshot_now
dlist_node catchange_node
dlist_node base_snapshot_node

References ReorderBufferChange::action, Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_node, ReorderBuffer::by_txn, ReorderBufferTXN::catchange_node, ReorderBuffer::catchange_txns, ReorderBufferTXN::changes, dlist_mutable_iter::cur, dclist_delete_from(), dlist_container, dlist_delete(), dlist_foreach_modify, HASH_REMOVE, hash_search(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_has_catalog_changes, rbtxn_is_known_subxact, rbtxn_is_serialized, rbtxn_is_streamed, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferFreeSnap(), ReorderBufferRestoreCleanup(), ReorderBufferReturnChange(), ReorderBufferReturnTXN(), SnapBuildSnapDecRefcount(), ReorderBufferTXN::snapshot_now, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAbortOld(), ReorderBufferFinishPrepared(), ReorderBufferForget(), ReorderBufferProcessTXN(), ReorderBufferReplay(), and ReorderBufferStreamCommit().

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2682 of file reorderbuffer.c.

2686 {
2687  ReorderBufferTXN *txn;
2688 
2689  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2690  false);
2691 
2692  /* unknown transaction, nothing to replay */
2693  if (txn == NULL)
2694  return;
2695 
2696  ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2697  origin_id, origin_lsn);
2698 }
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)

References InvalidXLogRecPtr, ReorderBufferReplay(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 1180 of file reorderbuffer.c.

1183 {
1184  ReorderBufferTXN *subtxn;
1185 
1186  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1187  InvalidXLogRecPtr, false);
1188 
1189  /*
1190  * No need to do anything if that subtxn didn't contain any changes
1191  */
1192  if (!subtxn)
1193  return;
1194 
1195  subtxn->final_lsn = commit_lsn;
1196  subtxn->end_lsn = end_lsn;
1197 
1198  /*
1199  * Assign this subxact as a child of the toplevel xact (no-op if already
1200  * done.)
1201  */
1202  ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1203 }
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit(), and DecodePrepare().

◆ ReorderBufferCopySnap()

static Snapshot ReorderBufferCopySnap ( ReorderBuffer rb,
Snapshot  orig_snap,
ReorderBufferTXN txn,
CommandId  cid 
)
static

Definition at line 1798 of file reorderbuffer.c.

1800 {
1801  Snapshot snap;
1802  dlist_iter iter;
1803  int i = 0;
1804  Size size;
1805 
1806  size = sizeof(SnapshotData) +
1807  sizeof(TransactionId) * orig_snap->xcnt +
1808  sizeof(TransactionId) * (txn->nsubtxns + 1);
1809 
1810  snap = MemoryContextAllocZero(rb->context, size);
1811  memcpy(snap, orig_snap, sizeof(SnapshotData));
1812 
1813  snap->copied = true;
1814  snap->active_count = 1; /* mark as active so nobody frees it */
1815  snap->regd_count = 0;
1816  snap->xip = (TransactionId *) (snap + 1);
1817 
1818  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1819 
1820  /*
1821  * snap->subxip contains all txids that belong to our transaction which we
1822  * need to check via cmin/cmax. That's why we store the toplevel
1823  * transaction in there as well.
1824  */
1825  snap->subxip = snap->xip + snap->xcnt;
1826  snap->subxip[i++] = txn->xid;
1827 
1828  /*
1829  * subxcnt isn't decreased when subtransactions abort, so count manually.
1830  * Since it's an upper boundary it is safe to use it for the allocation
1831  * above.
1832  */
1833  snap->subxcnt = 1;
1834 
1835  dlist_foreach(iter, &txn->subtxns)
1836  {
1837  ReorderBufferTXN *sub_txn;
1838 
1839  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1840  snap->subxip[i++] = sub_txn->xid;
1841  snap->subxcnt++;
1842  }
1843 
1844  /* sort so we can bsearch() later */
1845  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1846 
1847  /* store the specified current CommandId */
1848  snap->curcid = cid;
1849 
1850  return snap;
1851 }
int i
Definition: isn.c:73
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1037
#define qsort(a, b, c, d)
Definition: port.h:445
bool copied
Definition: snapshot.h:185
uint32 regd_count
Definition: snapshot.h:205
uint32 active_count
Definition: snapshot.h:204
CommandId curcid
Definition: snapshot.h:187
TransactionId * subxip
Definition: snapshot.h:180
TransactionId * xip
Definition: snapshot.h:168
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:136

References SnapshotData::active_count, ReorderBuffer::context, SnapshotData::copied, dlist_iter::cur, SnapshotData::curcid, dlist_container, dlist_foreach, i, MemoryContextAllocZero(), ReorderBufferTXN::nsubtxns, qsort, SnapshotData::regd_count, ReorderBufferTXN::subtxns, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::xcnt, ReorderBufferTXN::xid, xidComparator(), and SnapshotData::xip.

Referenced by ReorderBufferProcessTXN(), ReorderBufferSaveTXNSnapshot(), and ReorderBufferStreamTXN().

◆ ReorderBufferExecuteInvalidations()

static void ReorderBufferExecuteInvalidations ( uint32  nmsgs,
SharedInvalidationMessage msgs 
)
static

Definition at line 3307 of file reorderbuffer.c.

3308 {
3309  int i;
3310 
3311  for (i = 0; i < nmsgs; i++)
3313 }
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:615

References i, and LocalExecuteInvalidationMessage().

Referenced by ReorderBufferFinishPrepared(), and ReorderBufferProcessTXN().

◆ ReorderBufferFinishPrepared()

void ReorderBufferFinishPrepared ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
XLogRecPtr  two_phase_at,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn,
char *  gid,
bool  is_commit 
)

Definition at line 2788 of file reorderbuffer.c.

2793 {
2794  ReorderBufferTXN *txn;
2795  XLogRecPtr prepare_end_lsn;
2796  TimestampTz prepare_time;
2797 
2798  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
2799 
2800  /* unknown transaction, nothing to do */
2801  if (txn == NULL)
2802  return;
2803 
2804  /*
2805  * By this time the txn has the prepare record information, remember it to
2806  * be later used for rollback.
2807  */
2808  prepare_end_lsn = txn->end_lsn;
2809  prepare_time = txn->xact_time.prepare_time;
2810 
2811  /* add the gid in the txn */
2812  txn->gid = pstrdup(gid);
2813 
2814  /*
2815  * It is possible that this transaction is not decoded at prepare time
2816  * either because by that time we didn't have a consistent snapshot, or
2817  * two_phase was not enabled, or it was decoded earlier but we have
2818  * restarted. We only need to send the prepare if it was not decoded
2819  * earlier. We don't need to decode the xact for aborts if it is not done
2820  * already.
2821  */
2822  if ((txn->final_lsn < two_phase_at) && is_commit)
2823  {
2824  txn->txn_flags |= RBTXN_PREPARE;
2825 
2826  /*
2827  * The prepare info must have been updated in txn even if we skip
2828  * prepare.
2829  */
2831 
2832  /*
2833  * By this time the txn has the prepare record information and it is
2834  * important to use that so that downstream gets the accurate
2835  * information. If instead, we have passed commit information here
2836  * then downstream can behave as it has already replayed commit
2837  * prepared after the restart.
2838  */
2839  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2840  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2841  }
2842 
2843  txn->final_lsn = commit_lsn;
2844  txn->end_lsn = end_lsn;
2845  txn->xact_time.commit_time = commit_time;
2846  txn->origin_id = origin_id;
2847  txn->origin_lsn = origin_lsn;
2848 
2849  if (is_commit)
2850  rb->commit_prepared(rb, txn, commit_lsn);
2851  else
2852  rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
2853 
2854  /* cleanup: make sure there's no cache pollution */
2856  txn->invalidations);
2857  ReorderBufferCleanupTXN(rb, txn);
2858 }
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1483
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
#define RBTXN_PREPARE
TimestampTz commit_time
RepOriginId origin_id
XLogRecPtr origin_lsn
TimestampTz prepare_time
union ReorderBufferTXN::@103 xact_time
ReorderBufferCommitPreparedCB commit_prepared
ReorderBufferRollbackPreparedCB rollback_prepared

References Assert(), ReorderBuffer::commit_prepared, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, pstrdup(), RBTXN_PREPARE, ReorderBufferCleanupTXN(), ReorderBufferExecuteInvalidations(), ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBuffer::rollback_prepared, ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodeAbort(), and DecodeCommit().

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2958 of file reorderbuffer.c.

2959 {
2960  ReorderBufferTXN *txn;
2961 
2962  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2963  false);
2964 
2965  /* unknown, nothing to forget */
2966  if (txn == NULL)
2967  return;
2968 
2969  /* this transaction mustn't be streamed */
2970  Assert(!rbtxn_is_streamed(txn));
2971 
2972  /* cosmetic... */
2973  txn->final_lsn = lsn;
2974 
2975  /*
2976  * Process cache invalidation messages if there are any. Even if we're not
2977  * interested in the transaction's contents, it could have manipulated the
2978  * catalog and we need to update the caches according to that.
2979  */
2980  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2982  txn->invalidations);
2983  else
2984  Assert(txn->ninvalidations == 0);
2985 
2986  /* remove potential on-disk data, and deallocate */
2987  ReorderBufferCleanupTXN(rb, txn);
2988 }

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer rb)

Definition at line 385 of file reorderbuffer.c.

386 {
387  MemoryContext context = rb->context;
388 
389  /*
390  * We free separately allocated data by entirely scrapping reorderbuffer's
391  * memory context.
392  */
393  MemoryContextDelete(context);
394 
395  /* Free disk space used by unconsumed reorder buffers */
397 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:376

References ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

◆ ReorderBufferFreeSnap()

static void ReorderBufferFreeSnap ( ReorderBuffer rb,
Snapshot  snap 
)
static

Definition at line 1857 of file reorderbuffer.c.

1858 {
1859  if (snap->copied)
1860  pfree(snap);
1861  else
1863 }
void pfree(void *pointer)
Definition: mcxt.c:1306

References SnapshotData::copied, pfree(), and SnapBuildSnapDecRefcount().

Referenced by ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), ReorderBufferReturnChange(), and ReorderBufferStreamTXN().

◆ ReorderBufferGetCatalogChangesXacts()

TransactionId* ReorderBufferGetCatalogChangesXacts ( ReorderBuffer rb)

Definition at line 3354 of file reorderbuffer.c.

3355 {
3356  dlist_iter iter;
3357  TransactionId *xids = NULL;
3358  size_t xcnt = 0;
3359 
3360  /* Quick return if the list is empty */
3361  if (dclist_count(&rb->catchange_txns) == 0)
3362  return NULL;
3363 
3364  /* Initialize XID array */
3365  xids = (TransactionId *) palloc(sizeof(TransactionId) *
3367  dclist_foreach(iter, &rb->catchange_txns)
3368  {
3370  catchange_node,
3371  iter.cur);
3372 
3374 
3375  xids[xcnt++] = txn->xid;
3376  }
3377 
3378  qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
3379 
3380  Assert(xcnt == dclist_count(&rb->catchange_txns));
3381  return xids;
3382 }
#define dclist_container(type, membername, ptr)
Definition: ilist.h:884
static uint32 dclist_count(dclist_head *head)
Definition: ilist.h:869
#define dclist_foreach(iter, lhead)
Definition: ilist.h:907

References Assert(), ReorderBuffer::catchange_txns, dlist_iter::cur, dclist_container, dclist_count(), dclist_foreach, palloc(), qsort, rbtxn_has_catalog_changes, ReorderBufferTXN::xid, and xidComparator().

Referenced by SnapBuildSerialize().

◆ ReorderBufferGetChange()

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer rb)

Definition at line 1005 of file reorderbuffer.c.

1006 {
1007  ReorderBufferTXN *txn;
1008 
1009  AssertTXNLsnOrder(rb);
1010 
1011  if (dlist_is_empty(&rb->toplevel_by_lsn))
1012  return NULL;
1013 
1015 
1018  return txn;
1019 }
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:553

References Assert(), AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, and ReorderBuffer::toplevel_by_lsn.

Referenced by SnapBuildProcessRunningXacts().

◆ ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBuffer rb)

Definition at line 1033 of file reorderbuffer.c.

1034 {
1035  ReorderBufferTXN *txn;
1036 
1037  AssertTXNLsnOrder(rb);
1038 
1040  return InvalidTransactionId;
1041 
1042  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
1044  return txn->base_snapshot->xmin;
1045 }
TransactionId xmin
Definition: snapshot.h:157

References AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, dlist_head_element, dlist_is_empty(), InvalidTransactionId, ReorderBuffer::txns_by_base_snapshot_lsn, and SnapshotData::xmin.

Referenced by SnapBuildProcessRunningXacts().

◆ ReorderBufferGetRelids()

Oid* ReorderBufferGetRelids ( ReorderBuffer rb,
int  nrelids 
)

Definition at line 587 of file reorderbuffer.c.

588 {
589  Oid *relids;
590  Size alloc_len;
591 
592  alloc_len = sizeof(Oid) * nrelids;
593 
594  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
595 
596  return relids;
597 }

References ReorderBuffer::context, and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

◆ ReorderBufferGetTupleBuf()

ReorderBufferTupleBuf* ReorderBufferGetTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)

Definition at line 551 of file reorderbuffer.c.

552 {
553  ReorderBufferTupleBuf *tuple;
554  Size alloc_len;
555 
556  alloc_len = tuple_len + SizeofHeapTupleHeader;
557 
558  tuple = (ReorderBufferTupleBuf *)
560  sizeof(ReorderBufferTupleBuf) +
561  MAXIMUM_ALIGNOF + alloc_len);
562  tuple->alloc_tuple_size = alloc_len;
563  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
564 
565  return tuple;
566 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:38
HeapTupleHeader t_data
Definition: htup.h:68

References ReorderBufferTupleBuf::alloc_tuple_size, MemoryContextAlloc(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, HeapTupleData::t_data, ReorderBuffer::tup_context, and ReorderBufferTupleBuf::tuple.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

◆ ReorderBufferGetTXN()

static ReorderBufferTXN * ReorderBufferGetTXN ( ReorderBuffer rb)
static

Definition at line 403 of file reorderbuffer.c.

404 {
405  ReorderBufferTXN *txn;
406 
407  txn = (ReorderBufferTXN *)
409 
410  memset(txn, 0, sizeof(ReorderBufferTXN));
411 
412  dlist_init(&txn->changes);
413  dlist_init(&txn->tuplecids);
414  dlist_init(&txn->subtxns);
415 
416  /* InvalidCommandId is not zero, so set it explicitly */
418  txn->output_plugin_private = NULL;
419 
420  return txn;
421 }
CommandId command_id
void * output_plugin_private

References ReorderBufferTXN::changes, ReorderBufferTXN::command_id, dlist_init(), InvalidCommandId, MemoryContextAlloc(), ReorderBufferTXN::output_plugin_private, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, and ReorderBuffer::txn_context.

Referenced by ReorderBufferTXNByXid().

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 3030 of file reorderbuffer.c.

3032 {
3033  bool use_subtxn = IsTransactionOrTransactionBlock();
3034  int i;
3035 
3036  if (use_subtxn)
3037  BeginInternalSubTransaction("replay");
3038 
3039  /*
3040  * Force invalidations to happen outside of a valid transaction - that way
3041  * entries will just be marked as invalid without accessing the catalog.
3042  * That's advantageous because we don't need to setup the full state
3043  * necessary for catalog access.
3044  */
3045  if (use_subtxn)
3047 
3048  for (i = 0; i < ninvalidations; i++)
3049  LocalExecuteInvalidationMessage(&invalidations[i]);
3050 
3051  if (use_subtxn)
3053 }
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4814
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4520
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4625
void AbortCurrentTransaction(void)
Definition: xact.c:3293

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by ReorderBufferAbort(), ReorderBufferForget(), ReorderBufferInvalidate(), and xact_decode().

◆ ReorderBufferInvalidate()

void ReorderBufferInvalidate ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2999 of file reorderbuffer.c.

3000 {
3001  ReorderBufferTXN *txn;
3002 
3003  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3004  false);
3005 
3006  /* unknown, nothing to do */
3007  if (txn == NULL)
3008  return;
3009 
3010  /*
3011  * Process cache invalidation messages if there are any. Even if we're not
3012  * interested in the transaction's contents, it could have manipulated the
3013  * catalog and we need to update the caches according to that.
3014  */
3015  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3017  txn->invalidations);
3018  else
3019  Assert(txn->ninvalidations == 0);
3020 }

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodePrepare().

◆ ReorderBufferIterCompare()

static int ReorderBufferIterCompare ( Datum  a,
Datum  b,
void *  arg 
)
static

Definition at line 1222 of file reorderbuffer.c.

1223 {
1225  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
1226  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
1227 
1228  if (pos_a < pos_b)
1229  return 1;
1230  else if (pos_a == pos_b)
1231  return 0;
1232  return -1;
1233 }
void * arg
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:550
Definition: regguts.h:318

References a, arg, b, and DatumGetInt32().

Referenced by ReorderBufferIterTXNInit().

◆ ReorderBufferIterTXNFinish()

static void ReorderBufferIterTXNFinish ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1465 of file reorderbuffer.c.

1467 {
1468  int32 off;
1469 
1470  for (off = 0; off < state->nr_txns; off++)
1471  {
1472  if (state->entries[off].file.vfd != -1)
1473  FileClose(state->entries[off].file.vfd);
1474  }
1475 
1476  /* free memory we might have "leaked" in the last *Next call */
1477  if (!dlist_is_empty(&state->old_change))
1478  {
1479  ReorderBufferChange *change;
1480 
1481  change = dlist_container(ReorderBufferChange, node,
1482  dlist_pop_head_node(&state->old_change));
1483  ReorderBufferReturnChange(rb, change, true);
1484  Assert(dlist_is_empty(&state->old_change));
1485  }
1486 
1487  binaryheap_free(state->heap);
1488  pfree(state);
1489 }
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:68
void FileClose(File file)
Definition: fd.c:1883
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:415

References Assert(), binaryheap_free(), dlist_container, dlist_is_empty(), dlist_pop_head_node(), FileClose(), pfree(), and ReorderBufferReturnChange().

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferIterTXNInit()

static void ReorderBufferIterTXNInit ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferIterTXNState *volatile *  iter_state 
)
static

Definition at line 1245 of file reorderbuffer.c.

1247 {
1248  Size nr_txns = 0;
1250  dlist_iter cur_txn_i;
1251  int32 off;
1252 
1253  *iter_state = NULL;
1254 
1255  /* Check ordering of changes in the toplevel transaction. */
1256  AssertChangeLsnOrder(txn);
1257 
1258  /*
1259  * Calculate the size of our heap: one element for every transaction that
1260  * contains changes. (Besides the transactions already in the reorder
1261  * buffer, we count the one we were directly passed.)
1262  */
1263  if (txn->nentries > 0)
1264  nr_txns++;
1265 
1266  dlist_foreach(cur_txn_i, &txn->subtxns)
1267  {
1268  ReorderBufferTXN *cur_txn;
1269 
1270  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1271 
1272  /* Check ordering of changes in this subtransaction. */
1273  AssertChangeLsnOrder(cur_txn);
1274 
1275  if (cur_txn->nentries > 0)
1276  nr_txns++;
1277  }
1278 
1279  /* allocate iteration state */
1282  sizeof(ReorderBufferIterTXNState) +
1283  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1284 
1285  state->nr_txns = nr_txns;
1286  dlist_init(&state->old_change);
1287 
1288  for (off = 0; off < state->nr_txns; off++)
1289  {
1290  state->entries[off].file.vfd = -1;
1291  state->entries[off].segno = 0;
1292  }
1293 
1294  /* allocate heap */
1295  state->heap = binaryheap_allocate(state->nr_txns,
1297  state);
1298 
1299  /* Now that the state fields are initialized, it is safe to return it. */
1300  *iter_state = state;
1301 
1302  /*
1303  * Now insert items into the binary heap, in an unordered fashion. (We
1304  * will run a heap assembly step at the end; this is more efficient.)
1305  */
1306 
1307  off = 0;
1308 
1309  /* add toplevel transaction if it contains changes */
1310  if (txn->nentries > 0)
1311  {
1312  ReorderBufferChange *cur_change;
1313 
1314  if (rbtxn_is_serialized(txn))
1315  {
1316  /* serialize remaining changes */
1317  ReorderBufferSerializeTXN(rb, txn);
1318  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1319  &state->entries[off].segno);
1320  }
1321 
1322  cur_change = dlist_head_element(ReorderBufferChange, node,
1323  &txn->changes);
1324 
1325  state->entries[off].lsn = cur_change->lsn;
1326  state->entries[off].change = cur_change;
1327  state->entries[off].txn = txn;
1328 
1330  }
1331 
1332  /* add subtransactions if they contain changes */
1333  dlist_foreach(cur_txn_i, &txn->subtxns)
1334  {
1335  ReorderBufferTXN *cur_txn;
1336 
1337  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1338 
1339  if (cur_txn->nentries > 0)
1340  {
1341  ReorderBufferChange *cur_change;
1342 
1343  if (rbtxn_is_serialized(cur_txn))
1344  {
1345  /* serialize remaining changes */
1346  ReorderBufferSerializeTXN(rb, cur_txn);
1347  ReorderBufferRestoreChanges(rb, cur_txn,
1348  &state->entries[off].file,
1349  &state->entries[off].segno);
1350  }
1351  cur_change = dlist_head_element(ReorderBufferChange, node,
1352  &cur_txn->changes);
1353 
1354  state->entries[off].lsn = cur_change->lsn;
1355  state->entries[off].change = cur_change;
1356  state->entries[off].txn = cur_txn;
1357 
1359  }
1360  }
1361 
1362  /* assemble a valid binary heap */
1363  binaryheap_build(state->heap);
1364 }
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:125
void binaryheap_add_unordered(binaryheap *heap, Datum d)
Definition: binaryheap.c:109
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:32
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:560
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
static void AssertChangeLsnOrder(ReorderBufferTXN *txn)
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)

References AssertChangeLsnOrder(), binaryheap_add_unordered(), binaryheap_allocate(), binaryheap_build(), ReorderBufferTXN::changes, ReorderBuffer::context, dlist_iter::cur, dlist_container, dlist_foreach, dlist_head_element, dlist_init(), Int32GetDatum(), ReorderBufferChange::lsn, MemoryContextAllocZero(), ReorderBufferTXN::nentries, rbtxn_is_serialized, ReorderBufferIterCompare(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), and ReorderBufferTXN::subtxns.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferIterTXNNext()

static ReorderBufferChange * ReorderBufferIterTXNNext ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1373 of file reorderbuffer.c.

1374 {
1375  ReorderBufferChange *change;
1377  int32 off;
1378 
1379  /* nothing there anymore */
1380  if (state->heap->bh_size == 0)
1381  return NULL;
1382 
1383  off = DatumGetInt32(binaryheap_first(state->heap));
1384  entry = &state->entries[off];
1385 
1386  /* free memory we might have "leaked" in the previous *Next call */
1387  if (!dlist_is_empty(&state->old_change))
1388  {
1389  change = dlist_container(ReorderBufferChange, node,
1390  dlist_pop_head_node(&state->old_change));
1391  ReorderBufferReturnChange(rb, change, true);
1392  Assert(dlist_is_empty(&state->old_change));
1393  }
1394 
1395  change = entry->change;
1396 
1397  /*
1398  * update heap with information about which transaction has the next
1399  * relevant change in LSN order
1400  */
1401 
1402  /* there are in-memory changes */
1403  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1404  {
1405  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1406  ReorderBufferChange *next_change =
1408 
1409  /* txn stays the same */
1410  state->entries[off].lsn = next_change->lsn;
1411  state->entries[off].change = next_change;
1412 
1414  return change;
1415  }
1416 
1417  /* try to load changes from disk */
1418  if (entry->txn->nentries != entry->txn->nentries_mem)
1419  {
1420  /*
1421  * Ugly: restoring changes will reuse *Change records, thus delete the
1422  * current one from the per-tx list and only free in the next call.
1423  */
1424  dlist_delete(&change->node);
1425  dlist_push_tail(&state->old_change, &change->node);
1426 
1427  /*
1428  * Update the total bytes processed by the txn for which we are
1429  * releasing the current set of changes and restoring the new set of
1430  * changes.
1431  */
1432  rb->totalBytes += entry->txn->size;
1433  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1434  &state->entries[off].segno))
1435  {
1436  /* successfully restored changes from disk */
1437  ReorderBufferChange *next_change =
1439  &entry->txn->changes);
1440 
1441  elog(DEBUG2, "restored %u/%u changes from disk",
1442  (uint32) entry->txn->nentries_mem,
1443  (uint32) entry->txn->nentries);
1444 
1445  Assert(entry->txn->nentries_mem);
1446  /* txn stays the same */
1447  state->entries[off].lsn = next_change->lsn;
1448  state->entries[off].change = next_change;
1450 
1451  return change;
1452  }
1453  }
1454 
1455  /* ok, no changes there anymore, remove */
1457 
1458  return change;
1459 }
Datum binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:173
void binaryheap_replace_first(binaryheap *heap, Datum d)
Definition: binaryheap.c:207
Datum binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:158
static int32 next
Definition: blutils.c:219
unsigned int uint32
Definition: c.h:442
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:487
static bool dlist_has_next(dlist_head *head, dlist_node *node)
Definition: ilist.h:468
ReorderBufferChange * change
ReorderBufferTXN * txn

References Assert(), binaryheap_first(), binaryheap_remove_first(), binaryheap_replace_first(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, DatumGetInt32(), DEBUG2, dlist_container, dlist_delete(), dlist_has_next(), dlist_head_element, dlist_is_empty(), dlist_next_node(), dlist_pop_head_node(), dlist_push_tail(), elog(), ReorderBufferIterTXNEntry::file, Int32GetDatum(), ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, next, ReorderBufferChange::node, ReorderBufferRestoreChanges(), ReorderBufferReturnChange(), ReorderBufferTXN::size, ReorderBuffer::totalBytes, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferLargestStreamableTopTXN()

static ReorderBufferTXN* ReorderBufferLargestStreamableTopTXN ( ReorderBuffer rb)
static

Definition at line 3510 of file reorderbuffer.c.

3511 {
3512  dlist_iter iter;
3513  Size largest_size = 0;
3514  ReorderBufferTXN *largest = NULL;
3515 
3516  /* Find the largest top-level transaction having a base snapshot. */
3518  {
3519  ReorderBufferTXN *txn;
3520 
3521  txn = dlist_container(ReorderBufferTXN, base_snapshot_node, iter.cur);
3522 
3523  /* must not be a subtxn */
3525  /* base_snapshot must be set */
3526  Assert(txn->base_snapshot != NULL);
3527 
3528  if ((largest == NULL || txn->total_size > largest_size) &&
3529  (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) &&
3531  {
3532  largest = txn;
3533  largest_size = txn->total_size;
3534  }
3535  }
3536 
3537  return largest;
3538 }
#define rbtxn_has_streamable_change(txn)
#define rbtxn_has_partial_change(txn)

References Assert(), ReorderBufferTXN::base_snapshot, dlist_iter::cur, dlist_container, dlist_foreach, rbtxn_has_partial_change, rbtxn_has_streamable_change, rbtxn_is_known_subxact, ReorderBufferTXN::total_size, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferCheckMemoryLimit().

◆ ReorderBufferLargestTXN()

static ReorderBufferTXN* ReorderBufferLargestTXN ( ReorderBuffer rb)
static

Definition at line 3462 of file reorderbuffer.c.

3463 {
3464  HASH_SEQ_STATUS hash_seq;
3466  ReorderBufferTXN *largest = NULL;
3467 
3468  hash_seq_init(&hash_seq, rb->by_txn);
3469  while ((ent = hash_seq_search(&hash_seq)) != NULL)
3470  {
3471  ReorderBufferTXN *txn = ent->txn;
3472 
3473  /* if the current transaction is larger, remember it */
3474  if ((!largest) || (txn->size > largest->size))
3475  largest = txn;
3476  }
3477 
3478  Assert(largest);
3479  Assert(largest->size > 0);
3480  Assert(largest->size <= rb->size);
3481 
3482  return largest;
3483 }
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1431
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1421
ReorderBufferTXN * txn

References Assert(), ReorderBuffer::by_txn, hash_seq_init(), hash_seq_search(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferCheckMemoryLimit().

◆ ReorderBufferPrepare()

void ReorderBufferPrepare ( ReorderBuffer rb,
TransactionId  xid,
char *  gid 
)

Definition at line 2751 of file reorderbuffer.c.

2753 {
2754  ReorderBufferTXN *txn;
2755 
2756  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2757  false);
2758 
2759  /* unknown transaction, nothing to replay */
2760  if (txn == NULL)
2761  return;
2762 
2763  txn->txn_flags |= RBTXN_PREPARE;
2764  txn->gid = pstrdup(gid);
2765 
2766  /* The prepare info must have been updated in txn by now. */
2768 
2769  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2770  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2771 
2772  /*
2773  * We send the prepare for the concurrently aborted xacts so that later
2774  * when rollback prepared is decoded and sent, the downstream should be
2775  * able to rollback such a xact. See comments atop DecodePrepare.
2776  *
2777  * Note, for the concurrent_abort + streaming case a stream_prepare was
2778  * already sent within the ReorderBufferReplay call above.
2779  */
2780  if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
2781  rb->prepare(rb, txn, txn->final_lsn);
2782 }
ReorderBufferPrepareCB prepare

References Assert(), ReorderBufferTXN::concurrent_abort, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBuffer::prepare, ReorderBufferTXN::prepare_time, pstrdup(), rbtxn_is_streamed, RBTXN_PREPARE, ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodePrepare().

◆ ReorderBufferProcessPartialChange()

static void ReorderBufferProcessPartialChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  toast_insert 
)
static

Definition at line 703 of file reorderbuffer.c.

706 {
707  ReorderBufferTXN *toptxn;
708 
709  /*
710  * The partial changes need to be processed only while streaming
711  * in-progress transactions.
712  */
713  if (!ReorderBufferCanStream(rb))
714  return;
715 
716  /* Get the top transaction. */
717  if (txn->toptxn != NULL)
718  toptxn = txn->toptxn;
719  else
720  toptxn = txn;
721 
722  /*
723  * Indicate a partial change for toast inserts. The change will be
724  * considered as complete once we get the insert or update on the main
725  * table and we are sure that the pending toast chunks are not required
726  * anymore.
727  *
728  * If we allow streaming when there are pending toast chunks then such
729  * chunks won't be released till the insert (multi_insert) is complete and
730  * we expect the txn to have streamed all changes after streaming. This
731  * restriction is mainly to ensure the correctness of streamed
732  * transactions and it doesn't seem worth uplifting such a restriction
733  * just to allow this case because anyway we will stream the transaction
734  * once such an insert is complete.
735  */
736  if (toast_insert)
738  else if (rbtxn_has_partial_change(toptxn) &&
739  IsInsertOrUpdate(change->action) &&
740  change->data.tp.clear_toast_afterwards)
742 
743  /*
744  * Indicate a partial change for speculative inserts. The change will be
745  * considered as complete once we get the speculative confirm or abort
746  * token.
747  */
748  if (IsSpecInsert(change->action))
750  else if (rbtxn_has_partial_change(toptxn) &&
751  IsSpecConfirmOrAbort(change->action))
753 
754  /*
755  * Stream the transaction if it is serialized before and the changes are
756  * now complete in the top-level transaction.
757  *
758  * The reason for doing the streaming of such a transaction as soon as we
759  * get the complete change for it is that previously it would have reached
760  * the memory threshold and wouldn't get streamed because of incomplete
761  * changes. Delaying such transactions would increase apply lag for them.
762  */
764  !(rbtxn_has_partial_change(toptxn)) &&
765  rbtxn_is_serialized(txn) &&
767  ReorderBufferStreamTXN(rb, toptxn);
768 }
#define IsSpecInsert(action)
#define IsInsertOrUpdate(action)
#define IsSpecConfirmOrAbort(action)
#define RBTXN_HAS_PARTIAL_CHANGE

References ReorderBufferChange::action, ReorderBufferChange::data, IsInsertOrUpdate, IsSpecConfirmOrAbort, IsSpecInsert, RBTXN_HAS_PARTIAL_CHANGE, rbtxn_has_partial_change, rbtxn_has_streamable_change, rbtxn_is_serialized, ReorderBufferCanStartStreaming(), ReorderBufferCanStream(), ReorderBufferStreamTXN(), ReorderBufferTXN::toptxn, ReorderBufferChange::tp, and ReorderBufferTXN::txn_flags.

Referenced by ReorderBufferQueueChange().

◆ ReorderBufferProcessTXN()

static void ReorderBufferProcessTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn,
volatile Snapshot  snapshot_now,
volatile CommandId  command_id,
bool  streaming 
)
static

Definition at line 2071 of file reorderbuffer.c.

2076 {
2077  bool using_subtxn;
2079  ReorderBufferIterTXNState *volatile iterstate = NULL;
2080  volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
2081  ReorderBufferChange *volatile specinsert = NULL;
2082  volatile bool stream_started = false;
2083  ReorderBufferTXN *volatile curtxn = NULL;
2084 
2085  /* build data to be able to lookup the CommandIds of catalog tuples */
2087 
2088  /* setup the initial snapshot */
2089  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2090 
2091  /*
2092  * Decoding needs access to syscaches et al., which in turn use
2093  * heavyweight locks and such. Thus we need to have enough state around to
2094  * keep track of those. The easiest way is to simply use a transaction
2095  * internally. That also allows us to easily enforce that nothing writes
2096  * to the database by checking for xid assignments.
2097  *
2098  * When we're called via the SQL SRF there's already a transaction
2099  * started, so start an explicit subtransaction there.
2100  */
2101  using_subtxn = IsTransactionOrTransactionBlock();
2102 
2103  PG_TRY();
2104  {
2105  ReorderBufferChange *change;
2106 
2107  if (using_subtxn)
2108  BeginInternalSubTransaction(streaming ? "stream" : "replay");
2109  else
2111 
2112  /*
2113  * We only need to send begin/begin-prepare for non-streamed
2114  * transactions.
2115  */
2116  if (!streaming)
2117  {
2118  if (rbtxn_prepared(txn))
2119  rb->begin_prepare(rb, txn);
2120  else
2121  rb->begin(rb, txn);
2122  }
2123 
2124  ReorderBufferIterTXNInit(rb, txn, &iterstate);
2125  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
2126  {
2127  Relation relation = NULL;
2128  Oid reloid;
2129 
2131 
2132  /*
2133  * We can't call start stream callback before processing first
2134  * change.
2135  */
2136  if (prev_lsn == InvalidXLogRecPtr)
2137  {
2138  if (streaming)
2139  {
2140  txn->origin_id = change->origin_id;
2141  rb->stream_start(rb, txn, change->lsn);
2142  stream_started = true;
2143  }
2144  }
2145 
2146  /*
2147  * Enforce correct ordering of changes, merged from multiple
2148  * subtransactions. The changes may have the same LSN due to
2149  * MULTI_INSERT xlog records.
2150  */
2151  Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
2152 
2153  prev_lsn = change->lsn;
2154 
2155  /*
2156  * Set the current xid to detect concurrent aborts. This is
2157  * required for the cases when we decode the changes before the
2158  * COMMIT record is processed.
2159  */
2160  if (streaming || rbtxn_prepared(change->txn))
2161  {
2162  curtxn = change->txn;
2163  SetupCheckXidLive(curtxn->xid);
2164  }
2165 
2166  switch (change->action)
2167  {
2169 
2170  /*
2171  * Confirmation for speculative insertion arrived. Simply
2172  * use as a normal record. It'll be cleaned up at the end
2173  * of INSERT processing.
2174  */
2175  if (specinsert == NULL)
2176  elog(ERROR, "invalid ordering of speculative insertion changes");
2177  Assert(specinsert->data.tp.oldtuple == NULL);
2178  change = specinsert;
2180 
2181  /* intentionally fall through */
2185  Assert(snapshot_now);
2186 
2187  reloid = RelidByRelfilenumber(change->data.tp.rlocator.spcOid,
2188  change->data.tp.rlocator.relNumber);
2189 
2190  /*
2191  * Mapped catalog tuple without data, emitted while
2192  * catalog table was in the process of being rewritten. We
2193  * can fail to look up the relfilenumber, because the
2194  * relmapper has no "historic" view, in contrast to the
2195  * normal catalog during decoding. Thus repeated rewrites
2196  * can cause a lookup failure. That's OK because we do not
2197  * decode catalog changes anyway. Normally such tuples
2198  * would be skipped over below, but we can't identify
2199  * whether the table should be logically logged without
2200  * mapping the relfilenumber to the oid.
2201  */
2202  if (reloid == InvalidOid &&
2203  change->data.tp.newtuple == NULL &&
2204  change->data.tp.oldtuple == NULL)
2205  goto change_done;
2206  else if (reloid == InvalidOid)
2207  elog(ERROR, "could not map filenumber \"%s\" to relation OID",
2208  relpathperm(change->data.tp.rlocator,
2209  MAIN_FORKNUM));
2210 
2211  relation = RelationIdGetRelation(reloid);
2212 
2213  if (!RelationIsValid(relation))
2214  elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
2215  reloid,
2216  relpathperm(change->data.tp.rlocator,
2217  MAIN_FORKNUM));
2218 
2219  if (!RelationIsLogicallyLogged(relation))
2220  goto change_done;
2221 
2222  /*
2223  * Ignore temporary heaps created during DDL unless the
2224  * plugin has asked for them.
2225  */
2226  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2227  goto change_done;
2228 
2229  /*
2230  * For now ignore sequence changes entirely. Most of the
2231  * time they don't log changes using records we
2232  * understand, so it doesn't make sense to handle the few
2233  * cases we do.
2234  */
2235  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2236  goto change_done;
2237 
2238  /* user-triggered change */
2239  if (!IsToastRelation(relation))
2240  {
2241  ReorderBufferToastReplace(rb, txn, relation, change);
2242  ReorderBufferApplyChange(rb, txn, relation, change,
2243  streaming);
2244 
2245  /*
2246  * Only clear reassembled toast chunks if we're sure
2247  * they're not required anymore. The creator of the
2248  * tuple tells us.
2249  */
2250  if (change->data.tp.clear_toast_afterwards)
2251  ReorderBufferToastReset(rb, txn);
2252  }
2253  /* we're not interested in toast deletions */
2254  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2255  {
2256  /*
2257  * Need to reassemble the full toasted Datum in
2258  * memory, to ensure the chunks don't get reused till
2259  * we're done remove it from the list of this
2260  * transaction's changes. Otherwise it will get
2261  * freed/reused while restoring spooled data from
2262  * disk.
2263  */
2264  Assert(change->data.tp.newtuple != NULL);
2265 
2266  dlist_delete(&change->node);
2267  ReorderBufferToastAppendChunk(rb, txn, relation,
2268  change);
2269  }
2270 
2271  change_done:
2272 
2273  /*
2274  * If speculative insertion was confirmed, the record
2275  * isn't needed anymore.
2276  */
2277  if (specinsert != NULL)
2278  {
2279  ReorderBufferReturnChange(rb, specinsert, true);
2280  specinsert = NULL;
2281  }
2282 
2283  if (RelationIsValid(relation))
2284  {
2285  RelationClose(relation);
2286  relation = NULL;
2287  }
2288  break;
2289 
2291 
2292  /*
2293  * Speculative insertions are dealt with by delaying the
2294  * processing of the insert until the confirmation record
2295  * arrives. For that we simply unlink the record from the
2296  * chain, so it does not get freed/reused while restoring
2297  * spooled data from disk.
2298  *
2299  * This is safe in the face of concurrent catalog changes
2300  * because the relevant relation can't be changed between
2301  * speculative insertion and confirmation due to
2302  * CheckTableNotInUse() and locking.
2303  */
2304 
2305  /* clear out a pending (and thus failed) speculation */
2306  if (specinsert != NULL)
2307  {
2308  ReorderBufferReturnChange(rb, specinsert, true);
2309  specinsert = NULL;
2310  }
2311 
2312  /* and memorize the pending insertion */
2313  dlist_delete(&change->node);
2314  specinsert = change;
2315  break;
2316 
2318 
2319  /*
2320  * Abort for speculative insertion arrived. So cleanup the
2321  * specinsert tuple and toast hash.
2322  *
2323  * Note that we get the spec abort change for each toast
2324  * entry but we need to perform the cleanup only the first
2325  * time we get it for the main table.
2326  */
2327  if (specinsert != NULL)
2328  {
2329  /*
2330  * We must clean the toast hash before processing a
2331  * completely new tuple to avoid confusion about the
2332  * previous tuple's toast chunks.
2333  */
2334  Assert(change->data.tp.clear_toast_afterwards);
2335  ReorderBufferToastReset(rb, txn);
2336 
2337  /* We don't need this record anymore. */
2338  ReorderBufferReturnChange(rb, specinsert, true);
2339  specinsert = NULL;
2340  }
2341  break;
2342 
2344  {
2345  int i;
2346  int nrelids = change->data.truncate.nrelids;
2347  int nrelations = 0;
2348  Relation *relations;
2349 
2350  relations = palloc0(nrelids * sizeof(Relation));
2351  for (i = 0; i < nrelids; i++)
2352  {
2353  Oid relid = change->data.truncate.relids[i];
2354  Relation rel;
2355 
2356  rel = RelationIdGetRelation(relid);
2357 
2358  if (!RelationIsValid(rel))
2359  elog(ERROR, "could not open relation with OID %u", relid);
2360 
2361  if (!RelationIsLogicallyLogged(rel))
2362  continue;
2363 
2364  relations[nrelations++] = rel;
2365  }
2366 
2367  /* Apply the truncate. */
2368  ReorderBufferApplyTruncate(rb, txn, nrelations,
2369  relations, change,
2370  streaming);
2371 
2372  for (i = 0; i < nrelations; i++)
2373  RelationClose(relations[i]);
2374 
2375  break;
2376  }
2377 
2379  ReorderBufferApplyMessage(rb, txn, change, streaming);
2380  break;
2381 
2383  /* Execute the invalidation messages locally */
2384  ReorderBufferExecuteInvalidations(change->data.inval.ninvalidations,
2385  change->data.inval.invalidations);
2386  break;
2387 
2389  /* get rid of the old */
2390  TeardownHistoricSnapshot(false);
2391 
2392  if (snapshot_now->copied)
2393  {
2394  ReorderBufferFreeSnap(rb, snapshot_now);
2395  snapshot_now =
2396  ReorderBufferCopySnap(rb, change->data.snapshot,
2397  txn, command_id);
2398  }
2399 
2400  /*
2401  * Restored from disk, need to be careful not to double
2402  * free. We could introduce refcounting for that, but for
2403  * now this seems infrequent enough not to care.
2404  */
2405  else if (change->data.snapshot->copied)
2406  {
2407  snapshot_now =
2408  ReorderBufferCopySnap(rb, change->data.snapshot,
2409  txn, command_id);
2410  }
2411  else
2412  {
2413  snapshot_now = change->data.snapshot;
2414  }
2415 
2416  /* and continue with the new one */
2417  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2418  break;
2419 
2421  Assert(change->data.command_id != InvalidCommandId);
2422 
2423  if (command_id < change->data.command_id)
2424  {
2425  command_id = change->data.command_id;
2426 
2427  if (!snapshot_now->copied)
2428  {
2429  /* we don't use the global one anymore */
2430  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2431  txn, command_id);
2432  }
2433 
2434  snapshot_now->curcid = command_id;
2435 
2436  TeardownHistoricSnapshot(false);
2437  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2438  }
2439 
2440  break;
2441 
2443  elog(ERROR, "tuplecid value in changequeue");
2444  break;
2445  }
2446  }
2447 
2448  /* speculative insertion record must be freed by now */
2449  Assert(!specinsert);
2450 
2451  /* clean up the iterator */
2452  ReorderBufferIterTXNFinish(rb, iterstate);
2453  iterstate = NULL;
2454 
2455  /*
2456  * Update total transaction count and total bytes processed by the
2457  * transaction and its subtransactions. Ensure to not count the
2458  * streamed transaction multiple times.
2459  *
2460  * Note that the statistics computation has to be done after
2461  * ReorderBufferIterTXNFinish as it releases the serialized change
2462  * which we have already accounted in ReorderBufferIterTXNNext.
2463  */
2464  if (!rbtxn_is_streamed(txn))
2465  rb->totalTxns++;
2466 
2467  rb->totalBytes += txn->total_size;
2468 
2469  /*
2470  * Done with current changes, send the last message for this set of
2471  * changes depending upon streaming mode.
2472  */
2473  if (streaming)
2474  {
2475  if (stream_started)
2476  {
2477  rb->stream_stop(rb, txn, prev_lsn);
2478  stream_started = false;
2479  }
2480  }
2481  else
2482  {
2483  /*
2484  * Call either PREPARE (for two-phase transactions) or COMMIT (for
2485  * regular ones).
2486  */
2487  if (rbtxn_prepared(txn))
2488  rb->prepare(rb, txn, commit_lsn);
2489  else
2490  rb->commit(rb, txn, commit_lsn);
2491  }
2492 
2493  /* this is just a sanity check against bad output plugin behaviour */
2495  elog(ERROR, "output plugin used XID %u",
2497 
2498  /*
2499  * Remember the command ID and snapshot for the next set of changes in
2500  * streaming mode.
2501  */
2502  if (streaming)
2503  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2504  else if (snapshot_now->copied)
2505  ReorderBufferFreeSnap(rb, snapshot_now);
2506 
2507  /* cleanup */
2508  TeardownHistoricSnapshot(false);
2509 
2510  /*
2511  * Aborting the current (sub-)transaction as a whole has the right
2512  * semantics. We want all locks acquired in here to be released, not
2513  * reassigned to the parent and we do not want any database access
2514  * have persistent effects.
2515  */
2517 
2518  /* make sure there's no cache pollution */
2520 
2521  if (using_subtxn)
2523 
2524  /*
2525  * We are here due to one of the four reasons: 1. Decoding an
2526  * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
2527  * prepared txn that was (partially) streamed. 4. Decoding a committed
2528  * txn.
2529  *
2530  * For 1, we allow truncation of txn data by removing the changes
2531  * already streamed but still keeping other things like invalidations,
2532  * snapshot, and tuplecids. For 2 and 3, we indicate
2533  * ReorderBufferTruncateTXN to do more elaborate truncation of txn
2534  * data as the entire transaction has been decoded except for commit.
2535  * For 4, as the entire txn has been decoded, we can fully clean up
2536  * the TXN reorder buffer.
2537  */
2538  if (streaming || rbtxn_prepared(txn))
2539  {
2541  /* Reset the CheckXidAlive */
2543  }
2544  else
2545  ReorderBufferCleanupTXN(rb, txn);
2546  }
2547  PG_CATCH();
2548  {
2549  MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
2550  ErrorData *errdata = CopyErrorData();
2551 
2552  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
2553  if (iterstate)
2554  ReorderBufferIterTXNFinish(rb, iterstate);
2555 
2557 
2558  /*
2559  * Force cache invalidation to happen outside of a valid transaction
2560  * to prevent catalog access as we just caught an error.
2561  */
2563 
2564  /* make sure there's no cache pollution */
2566  txn->invalidations);
2567 
2568  if (using_subtxn)
2570 
2571  /*
2572  * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2573  * abort of the (sub)transaction we are streaming or preparing. We
2574  * need to do the cleanup and return gracefully on this error, see
2575  * SetupCheckXidLive.
2576  *
2577  * This error code can be thrown by one of the callbacks we call
2578  * during decoding so we need to ensure that we return gracefully only
2579  * when we are sending the data in streaming mode and the streaming is
2580  * not finished yet or when we are sending the data out on a PREPARE
2581  * during a two-phase commit.
2582  */
2583  if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2584  (stream_started || rbtxn_prepared(txn)))
2585  {
2586  /* curtxn must be set for streaming or prepared transactions */
2587  Assert(curtxn);
2588 
2589  /* Cleanup the temporary error state. */
2590  FlushErrorState();
2591  FreeErrorData(errdata);
2592  errdata = NULL;
2593  curtxn->concurrent_abort = true;
2594 
2595  /* Reset the TXN so that it is allowed to stream remaining data. */
2596  ReorderBufferResetTXN(rb, txn, snapshot_now,
2597  command_id, prev_lsn,
2598  specinsert);
2599  }
2600  else
2601  {
2602  ReorderBufferCleanupTXN(rb, txn);
2603  MemoryContextSwitchTo(ecxt);
2604  PG_RE_THROW();
2605  }
2606  }
2607  PG_END_TRY();
2608 }
bool IsToastRelation(Relation relation)
Definition: catalog.c:147
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1776
void FlushErrorState(void)
Definition: elog.c:1825
ErrorData * CopyErrorData(void)
Definition: elog.c:1720
#define PG_RE_THROW()
Definition: elog.h:411
#define PG_TRY(...)
Definition: elog.h:370
#define PG_END_TRY(...)
Definition: elog.h:395
#define PG_CATCH(...)
Definition: elog.h:380
void * palloc0(Size size)
Definition: mcxt.c:1230
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
const void * data
#define InvalidOid
Definition: postgres_ext.h:36
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:699
#define RelationIsValid(relation)
Definition: rel.h:474
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2054
void RelationClose(Relation relation)
Definition: relcache.c:2160
Oid RelidByRelfilenumber(Oid reltablespace, RelFileNumber relfilenumber)
@ MAIN_FORKNUM
Definition: relpath.h:50
#define relpathperm(rlocator, forknum)
Definition: relpath.h:90
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
static void ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
static void ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
static void SetupCheckXidLive(TransactionId xid)
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
static void ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
#define rbtxn_prepared(txn)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2096
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2080
int sqlerrcode
Definition: elog.h:438
Form_pg_class rd_rel
Definition: rel.h:110
RepOriginId origin_id
Definition: reorderbuffer.h:90
ReorderBufferBeginCB begin_prepare
ReorderBufferStreamStopCB stream_stop
ReorderBufferCommitCB commit
ReorderBufferStreamStartCB stream_start
ReorderBufferBeginCB begin
TransactionId CheckXidAlive
Definition: xact.c:98
void StartTransactionCommand(void)
Definition: xact.c:2925
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:461
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:444

References AbortCurrentTransaction(), ReorderBufferChange::action, Assert(), ReorderBuffer::begin, ReorderBuffer::begin_prepare, BeginInternalSubTransaction(), CHECK_FOR_INTERRUPTS, CheckXidAlive, ReorderBufferChange::command_id, ReorderBuffer::commit, ReorderBufferTXN::concurrent_abort, SnapshotData::copied, CopyErrorData(), SnapshotData::curcid, CurrentMemoryContext, ReorderBufferChange::data, data, dlist_delete(), elog(), ERROR, FlushErrorState(), FreeErrorData(), GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), i, ReorderBufferChange::inval, ReorderBufferTXN::invalidations, InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, ReorderBufferChange::node, ReorderBufferChange::origin_id, ReorderBufferTXN::origin_id, ReorderBuffer::output_rewrites, palloc0(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, ReorderBuffer::prepare, rbtxn_is_streamed, rbtxn_prepared, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelationIsValid, RelidByRelfilenumber(), relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferApplyChange(), ReorderBufferApplyMessage(), ReorderBufferApplyTruncate(), ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferResetTXN(), ReorderBufferReturnChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), RollbackAndReleaseCurrentSubTransaction(), SetupCheckXidLive(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, ErrorData::sqlerrcode, StartTransactionCommand(), ReorderBuffer::stream_start, ReorderBuffer::stream_stop, TeardownHistoricSnapshot(), ReorderBufferTXN::total_size, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTXN::tuplecid_hash, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferReplay(), and ReorderBufferStreamTXN().

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3066 of file reorderbuffer.c.

3067 {
3068  /* many records won't have an xid assigned, centralize check here */
3069  if (xid != InvalidTransactionId)
3070  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3071 }

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by heap2_decode(), heap_decode(), LogicalDecodingProcessRecord(), logicalmsg_decode(), standby_decode(), xact_decode(), and xlog_decode().

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change,
bool  toast_insert 
)

Definition at line 775 of file reorderbuffer.c.

777 {
778  ReorderBufferTXN *txn;
779 
780  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
781 
782  /*
783  * While streaming the previous changes we have detected that the
784  * transaction is aborted. So there is no point in collecting further
785  * changes for it.
786  */
787  if (txn->concurrent_abort)
788  {
789  /*
790  * We don't need to update memory accounting for this change as we
791  * have not added it to the queue yet.
792  */
793  ReorderBufferReturnChange(rb, change, false);
794  return;
795  }
796 
797  /*
798  * The changes that are sent downstream are considered streamable. We
799  * remember such transactions so that only those will later be considered
800  * for streaming.
801  */
802  if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
808  {
809  ReorderBufferTXN *toptxn;
810 
811  /* get the top transaction */
812  if (txn->toptxn != NULL)
813  toptxn = txn->toptxn;
814  else
815  toptxn = txn;
816 
818  }
819 
820  change->lsn = lsn;
821  change->txn = txn;
822 
823  Assert(InvalidXLogRecPtr != lsn);
824  dlist_push_tail(&txn->changes, &change->node);
825  txn->nentries++;
826  txn->nentries_mem++;
827 
828  /* update memory accounting information */
829  ReorderBufferChangeMemoryUpdate(rb, change, true,
830  ReorderBufferChangeSize(change));
831 
832  /* process partial change */
833  ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
834 
835  /* check the memory limits and evict something if needed */
837 }
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition, Size sz)
#define RBTXN_HAS_STREAMABLE_CHANGE

References ReorderBufferChange::action, Assert(), ReorderBufferTXN::changes, ReorderBufferTXN::concurrent_abort, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, RBTXN_HAS_STREAMABLE_CHANGE, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferCheckMemoryLimit(), ReorderBufferProcessPartialChange(), ReorderBufferReturnChange(), ReorderBufferTXNByXid(), ReorderBufferTXN::toptxn, ReorderBufferChange::txn, and ReorderBufferTXN::txn_flags.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddInvalidations(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snap,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 844 of file reorderbuffer.c.

848 {
849  if (transactional)
850  {
851  MemoryContext oldcontext;
852  ReorderBufferChange *change;
853 
855 
856  oldcontext = MemoryContextSwitchTo(rb->context);
857 
858  change = ReorderBufferGetChange(rb);
860  change->data.msg.prefix = pstrdup(prefix);
861  change->data.msg.message_size = message_size;
862  change->data.msg.message = palloc(message_size);
863  memcpy(change->data.msg.message, message, message_size);
864 
865  ReorderBufferQueueChange(rb, xid, lsn, change, false);
866 
867  MemoryContextSwitchTo(oldcontext);
868  }
869  else
870  {
871  ReorderBufferTXN *txn = NULL;
872  volatile Snapshot snapshot_now = snap;
873 
874  if (xid != InvalidTransactionId)
875  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
876 
877  /* setup snapshot to allow catalog access */
878  SetupHistoricSnapshot(snapshot_now, NULL);
879  PG_TRY();
880  {
881  rb->message(rb, txn, lsn, false, prefix, message_size, message);
882 
884  }
885  PG_CATCH();
886  {
888  PG_RE_THROW();
889  }
890  PG_END_TRY();
891  }
892 }

References ReorderBufferChange::action, Assert(), ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), and TeardownHistoricSnapshot().

Referenced by logicalmsg_decode().

◆ ReorderBufferRememberPrepareInfo()

bool ReorderBufferRememberPrepareInfo ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  prepare_lsn,
XLogRecPtr  end_lsn,
TimestampTz  prepare_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2704 of file reorderbuffer.c.

2708 {
2709  ReorderBufferTXN *txn;
2710 
2711  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2712 
2713  /* unknown transaction, nothing to do */
2714  if (txn == NULL)
2715  return false;
2716 
2717  /*
2718  * Remember the prepare information to be later used by commit prepared in
2719  * case we skip doing prepare.
2720  */
2721  txn->final_lsn = prepare_lsn;
2722  txn->end_lsn = end_lsn;
2723  txn->xact_time.prepare_time = prepare_time;
2724  txn->origin_id = origin_id;
2725  txn->origin_lsn = origin_lsn;
2726 
2727  return true;
2728 }

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, ReorderBufferTXNByXid(), and ReorderBufferTXN::xact_time.

Referenced by DecodePrepare().

◆ ReorderBufferReplay()

static void ReorderBufferReplay ( ReorderBufferTXN txn,
ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)
static

Definition at line 2621 of file reorderbuffer.c.

2626 {
2627  Snapshot snapshot_now;
2628  CommandId command_id = FirstCommandId;
2629 
2630  txn->final_lsn = commit_lsn;
2631  txn->end_lsn = end_lsn;
2632  txn->xact_time.commit_time = commit_time;
2633  txn->origin_id = origin_id;
2634  txn->origin_lsn = origin_lsn;
2635 
2636  /*
2637  * If the transaction was (partially) streamed, we need to commit it in a
2638  * 'streamed' way. That is, we first stream the remaining part of the
2639  * transaction, and then invoke stream_commit message.
2640  *
2641  * Called after everything (origin ID, LSN, ...) is stored in the
2642  * transaction to avoid passing that information directly.
2643  */
2644  if (rbtxn_is_streamed(txn))
2645  {
2646  ReorderBufferStreamCommit(rb, txn);
2647  return;
2648  }
2649 
2650  /*
2651  * If this transaction has no snapshot, it didn't make any changes to the
2652  * database, so there's nothing to decode. Note that
2653  * ReorderBufferCommitChild will have transferred any snapshots from
2654  * subtransactions if there were any.
2655  */
2656  if (txn->base_snapshot == NULL)
2657  {
2658  Assert(txn->ninvalidations == 0);
2659 
2660  /*
2661  * Removing this txn before a commit might result in the computation
2662  * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
2663  */
2664  if (!rbtxn_prepared(txn))
2665  ReorderBufferCleanupTXN(rb, txn);
2666  return;
2667  }
2668 
2669  snapshot_now = txn->base_snapshot;
2670 
2671  /* Process and send the changes to output plugin. */
2672  ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2673  command_id, false);
2674 }
#define FirstCommandId
Definition: c.h:604
uint32 CommandId
Definition: c.h:602
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, FirstCommandId, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, rbtxn_is_streamed, rbtxn_prepared, ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), ReorderBufferStreamCommit(), and ReorderBufferTXN::xact_time.

Referenced by ReorderBufferCommit(), ReorderBufferFinishPrepared(), and ReorderBufferPrepare().

◆ ReorderBufferResetTXN()

static void ReorderBufferResetTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id,
XLogRecPtr  last_lsn,
ReorderBufferChange specinsert 
)
static

Definition at line 2028 of file reorderbuffer.c.

2033 {
2034  /* Discard the changes that we just streamed */
2036 
2037  /* Free all resources allocated for toast reconstruction */
2038  ReorderBufferToastReset(rb, txn);
2039 
2040  /* Return the spec insert change if it is not NULL */
2041  if (specinsert != NULL)
2042  {
2043  ReorderBufferReturnChange(rb, specinsert, true);
2044  specinsert = NULL;
2045  }
2046 
2047  /*
2048  * For the streaming case, stop the stream and remember the command ID and
2049  * snapshot for the streaming run.
2050  */
2051  if (rbtxn_is_streamed(txn))
2052  {
2053  rb->stream_stop(rb, txn, last_lsn);
2054  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2055  }
2056 }

References rbtxn_is_streamed, rbtxn_prepared, ReorderBufferReturnChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), and ReorderBuffer::stream_stop.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferRestoreChange()

static void ReorderBufferRestoreChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
char *  data 
)
static

Definition at line 4303 of file reorderbuffer.c.

4305 {
4306  ReorderBufferDiskChange *ondisk;
4307  ReorderBufferChange *change;
4308 
4309  ondisk = (ReorderBufferDiskChange *) data;
4310 
4311  change = ReorderBufferGetChange(rb);
4312 
4313  /* copy static part */
4314  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
4315 
4316  data += sizeof(ReorderBufferDiskChange);
4317 
4318  /* restore individual stuff */
4319  switch (change->action)
4320  {
4321  /* fall through these, they're all similar enough */
4326  if (change->data.tp.oldtuple)
4327  {
4328  uint32 tuplelen = ((HeapTuple) data)->t_len;
4329 
4330  change->data.tp.oldtuple =
4332 
4333  /* restore ->tuple */
4334  memcpy(&change->data.tp.oldtuple->tuple, data,
4335  sizeof(HeapTupleData));
4336  data += sizeof(HeapTupleData);
4337 
4338  /* reset t_data pointer into the new tuplebuf */
4339  change->data.tp.oldtuple->tuple.t_data =
4340  ReorderBufferTupleBufData(change->data.tp.oldtuple);
4341 
4342  /* restore tuple data itself */
4343  memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
4344  data += tuplelen;
4345  }
4346 
4347  if (change->data.tp.newtuple)
4348  {
4349  /* here, data might not be suitably aligned! */
4350  uint32 tuplelen;
4351 
4352  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
4353  sizeof(uint32));
4354 
4355  change->data.tp.newtuple =
4357 
4358  /* restore ->tuple */
4359  memcpy(&change->data.tp.newtuple->tuple, data,
4360  sizeof(HeapTupleData));
4361  data += sizeof(HeapTupleData);
4362 
4363  /* reset t_data pointer into the new tuplebuf */
4364  change->data.tp.newtuple->tuple.t_data =
4365  ReorderBufferTupleBufData(change->data.tp.newtuple);
4366 
4367  /* restore tuple data itself */
4368  memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
4369  data += tuplelen;
4370  }
4371 
4372  break;
4374  {
4375  Size prefix_size;
4376 
4377  /* read prefix */
4378  memcpy(&prefix_size, data, sizeof(Size));
4379  data += sizeof(Size);
4380  change->data.msg.prefix = MemoryContextAlloc(rb->context,
4381  prefix_size);
4382  memcpy(change->data.msg.prefix, data, prefix_size);
4383  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
4384  data += prefix_size;
4385 
4386  /* read the message */
4387  memcpy(&change->data.msg.message_size, data, sizeof(Size));
4388  data += sizeof(Size);
4389  change->data.msg.message = MemoryContextAlloc(rb->context,
4390  change->data.msg.message_size);
4391  memcpy(change->data.msg.message, data,
4392  change->data.msg.message_size);
4393  data += change->data.msg.message_size;
4394 
4395  break;
4396  }
4398  {
4399  Size inval_size = sizeof(SharedInvalidationMessage) *
4400  change->data.inval.ninvalidations;
4401 
4402  change->data.inval.invalidations =
4403  MemoryContextAlloc(rb->context, inval_size);
4404 
4405  /* read the message */
4406  memcpy(change->data.inval.invalidations, data, inval_size);
4407 
4408  break;
4409  }
4411  {
4412  Snapshot oldsnap;
4413  Snapshot newsnap;
4414  Size size;
4415 
4416  oldsnap = (Snapshot) data;
4417 
4418  size = sizeof(SnapshotData) +
4419  sizeof(TransactionId) * oldsnap->xcnt +
4420  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
4421 
4422  change->data.snapshot = MemoryContextAllocZero(rb->context, size);
4423 
4424  newsnap = change->data.snapshot;
4425 
4426  memcpy(newsnap, data, size);
4427  newsnap->xip = (TransactionId *)
4428  (((char *) newsnap) + sizeof(SnapshotData));
4429  newsnap->subxip = newsnap->xip + newsnap->xcnt;
4430  newsnap->copied = true;
4431  break;
4432  }
4433  /* the base struct contains all the data, easy peasy */
4435  {
4436  Oid *relids;
4437 
4438  relids = ReorderBufferGetRelids(rb,
4439  change->data.truncate.nrelids);
4440  memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
4441  change->data.truncate.relids = relids;
4442 
4443  break;
4444  }
4449  break;
4450  }
4451 
4452  dlist_push_tail(&txn->changes, &change->node);
4453  txn->nentries_mem++;
4454 
4455  /*
4456  * Update memory accounting for the restored change. We need to do this
4457  * although we don't check the memory limit when restoring the changes in
4458  * this branch (we only do that when initially queueing the changes after
4459  * decoding), because we will release the changes later, and that will
4460  * update the accounting too (subtracting the size from the counters). And
4461  * we don't want to underflow there.
4462  */
4463  ReorderBufferChangeMemoryUpdate(rb, change, true,
4464  ReorderBufferChangeSize(change));
4465 }
HeapTupleData * HeapTuple
Definition: htup.h:71
struct ReorderBufferDiskChange ReorderBufferDiskChange
Oid * ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
ReorderBufferTupleBuf * ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
struct SnapshotData * Snapshot
Definition: snapshot.h:121
ReorderBufferChange change

References ReorderBufferChange::action, Assert(), ReorderBufferDiskChange::change, ReorderBufferTXN::changes, ReorderBuffer::context, SnapshotData::copied, ReorderBufferChange::data, data, dlist_push_tail(), ReorderBufferChange::inval, MemoryContextAlloc(), MemoryContextAllocZero(), ReorderBufferChange::msg, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferGetChange(), ReorderBufferGetRelids(), ReorderBufferGetTupleBuf(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, ReorderBufferChange::tp, ReorderBufferChange::truncate, SnapshotData::xcnt, and SnapshotData::xip.

Referenced by ReorderBufferRestoreChanges().

◆ ReorderBufferRestoreChanges()

static Size ReorderBufferRestoreChanges ( ReorderBuffer rb,
ReorderBufferTXN txn,
TXNEntryFile file,
XLogSegNo segno 
)
static

Definition at line 4160 of file reorderbuffer.c.

4162 {
4163  Size restored = 0;
4164  XLogSegNo last_segno;
4165  dlist_mutable_iter cleanup_iter;
4166  File *fd = &file->vfd;
4167 
4170 
4171  /* free current entries, so we have memory for more */
4172  dlist_foreach_modify(cleanup_iter, &txn->changes)
4173  {
4175  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
4176 
4177  dlist_delete(&cleanup->node);
4179  }
4180  txn->nentries_mem = 0;
4181  Assert(dlist_is_empty(&txn->changes));
4182 
4183  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
4184 
4185  while (restored < max_changes_in_memory && *segno <= last_segno)
4186  {
4187  int readBytes;
4188  ReorderBufferDiskChange *ondisk;
4189 
4191 
4192  if (*fd == -1)
4193  {
4194  char path[MAXPGPATH];
4195 
4196  /* first time in */
4197  if (*segno == 0)
4198  XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
4199 
4200  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
4201 
4202  /*
4203  * No need to care about TLIs here, only used during a single run,
4204  * so each LSN only maps to a specific WAL record.
4205  */
4207  *segno);
4208 
4209  *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
4210 
4211  /* No harm in resetting the offset even in case of failure */
4212  file->curOffset = 0;
4213 
4214  if (*fd < 0 && errno == ENOENT)
4215  {
4216  *fd = -1;
4217  (*segno)++;
4218  continue;
4219  }
4220  else if (*fd < 0)
4221  ereport(ERROR,
4223  errmsg("could not open file \"%s\": %m",
4224  path)));
4225  }
4226 
4227  /*
4228  * Read the statically sized part of a change which has information
4229  * about the total size. If we couldn't read a record, we're at the
4230  * end of this file.
4231  */
4233  readBytes = FileRead(file->vfd, rb->outbuf,
4234  sizeof(ReorderBufferDiskChange),
4236 
4237  /* eof */
4238  if (readBytes == 0)
4239  {
4240  FileClose(*fd);
4241  *fd = -1;
4242  (*segno)++;
4243  continue;
4244  }
4245  else if (readBytes < 0)
4246  ereport(ERROR,
4248  errmsg("could not read from reorderbuffer spill file: %m")));
4249  else if (readBytes != sizeof(ReorderBufferDiskChange))
4250  ereport(ERROR,
4252  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4253  readBytes,
4254  (uint32) sizeof(ReorderBufferDiskChange))));
4255 
4256  file->curOffset += readBytes;
4257 
4258  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4259 
4261  sizeof(ReorderBufferDiskChange) + ondisk->size);
4262  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4263 
4264  readBytes = FileRead(file->vfd,
4265  rb->outbuf + sizeof(ReorderBufferDiskChange),
4266  ondisk->size - sizeof(ReorderBufferDiskChange),
4267  file->curOffset,
4269 
4270  if (readBytes < 0)
4271  ereport(ERROR,
4273  errmsg("could not read from reorderbuffer spill file: %m")));
4274  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
4275  ereport(ERROR,
4277  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4278  readBytes,
4279  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
4280 
4281  file->curOffset += readBytes;
4282 
4283  /*
4284  * ok, read a full change from disk, now restore it into proper
4285  * in-memory format
4286  */
4287  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
4288  restored++;
4289  }
4290 
4291  return restored;
4292 }
static void cleanup(void)
Definition: bootstrap.c:696
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1488
int FileRead(File file, void *buffer, size_t amount, off_t offset, uint32 wait_event_info)
Definition: fd.c:2034
int File
Definition: fd.h:54
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *data)
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
static const Size max_changes_in_memory
@ WAIT_EVENT_REORDER_BUFFER_READ
Definition: wait_event.h:200
int wal_segment_size
Definition: xlog.c:146
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition: xlogdefs.h:48

References Assert(), ReorderBufferTXN::changes, CHECK_FOR_INTERRUPTS, cleanup(), dlist_mutable_iter::cur, TXNEntryFile::curOffset, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FileClose(), FileRead(), ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, max_changes_in_memory, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBuffer::outbuf, PathNameOpenFile(), PG_BINARY, ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferSerializedPath(), ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, TXNEntryFile::vfd, WAIT_EVENT_REORDER_BUFFER_READ, wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferIterTXNInit(), and ReorderBufferIterTXNNext().

◆ ReorderBufferRestoreCleanup()

static void ReorderBufferRestoreCleanup ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 4471 of file reorderbuffer.c.

4472 {
4473  XLogSegNo first;
4474  XLogSegNo cur;
4475  XLogSegNo last;
4476 
4479 
4480  XLByteToSeg(txn->first_lsn, first, wal_segment_size);
4481  XLByteToSeg(txn->final_lsn, last, wal_segment_size);
4482 
4483  /* iterate over all possible filenames, and delete them */
4484  for (cur = first; cur <= last; cur++)
4485  {
4486  char path[MAXPGPATH];
4487 
4489  if (unlink(path) != 0 && errno != ENOENT)
4490  ereport(ERROR,
4492  errmsg("could not remove file \"%s\": %m", path)));
4493  }
4494 }
struct cursor * cur
Definition: ecpg.c:28

References Assert(), cur, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, MAXPGPATH, MyReplicationSlot, ReorderBufferSerializedPath(), wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferCleanupTXN(), and ReorderBufferTruncateTXN().

◆ ReorderBufferReturnChange()

void ReorderBufferReturnChange ( ReorderBuffer rb,
ReorderBufferChange change,
bool  upd_mem 
)

Definition at line 481 of file reorderbuffer.c.

483 {
484  /* update memory accounting info */
485  if (upd_mem)
486  ReorderBufferChangeMemoryUpdate(rb, change, false,
487  ReorderBufferChangeSize(change));
488 
489  /* free contained data */
490  switch (change->action)
491  {
496  if (change->data.tp.newtuple)
497  {
498  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
499  change->data.tp.newtuple = NULL;
500  }
501 
502  if (change->data.tp.oldtuple)
503  {
504  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
505  change->data.tp.oldtuple = NULL;
506  }
507  break;
509  if (change->data.msg.prefix != NULL)
510  pfree(change->data.msg.prefix);
511  change->data.msg.prefix = NULL;
512  if (change->data.msg.message != NULL)
513  pfree(change->data.msg.message);
514  change->data.msg.message = NULL;
515  break;
517  if (change->data.inval.invalidations)
518  pfree(change->data.inval.invalidations);
519  change->data.inval.invalidations = NULL;
520  break;
522  if (change->data.snapshot)
523  {
524  ReorderBufferFreeSnap(rb, change->data.snapshot);
525  change->data.snapshot = NULL;
526  }
527  break;
528  /* no data in addition to the struct itself */
530  if (change->data.truncate.relids != NULL)
531  {
532  ReorderBufferReturnRelids(rb, change->data.truncate.relids);
533  change->data.truncate.relids = NULL;
534  }
535  break;
540  break;
541  }
542 
543  pfree(change);
544 }
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
void ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::msg, pfree(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferFreeSnap(), ReorderBufferReturnRelids(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, ReorderBufferChange::tp, and ReorderBufferChange::truncate.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferProcessTXN(), ReorderBufferQueueChange(), ReorderBufferResetTXN(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferToastReset(), and ReorderBufferTruncateTXN().

◆ ReorderBufferReturnRelids()

void ReorderBufferReturnRelids ( ReorderBuffer rb,
Oid relids 
)

Definition at line 603 of file reorderbuffer.c.

604 {
605  pfree(relids);
606 }

References pfree().

Referenced by ReorderBufferReturnChange().

◆ ReorderBufferReturnTupleBuf()

void ReorderBufferReturnTupleBuf ( ReorderBuffer rb,
ReorderBufferTupleBuf tuple 
)

Definition at line 572 of file reorderbuffer.c.

573 {
574  pfree(tuple);
575 }

References pfree().

Referenced by ReorderBufferReturnChange().

◆ ReorderBufferReturnTXN()

static void ReorderBufferReturnTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 427 of file reorderbuffer.c.

428 {
429  /* clean the lookup cache if we were cached (quite likely) */
430  if (rb->by_txn_last_xid == txn->xid)
431  {
433  rb->by_txn_last_txn = NULL;
434  }
435 
436  /* free data that's contained */
437 
438  if (txn->gid != NULL)
439  {
440  pfree(txn->gid);
441  txn->gid = NULL;
442  }
443 
444  if (txn->tuplecid_hash != NULL)
445  {
447  txn->tuplecid_hash = NULL;
448  }
449 
450  if (txn->invalidations)
451  {
452  pfree(txn->invalidations);
453  txn->invalidations = NULL;
454  }
455 
456  /* Reset the toast hash */
457  ReorderBufferToastReset(rb, txn);
458 
459  pfree(txn);
460 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:863

References ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBufferTXN::gid, hash_destroy(), ReorderBufferTXN::invalidations, InvalidTransactionId, pfree(), ReorderBufferToastReset(), ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::xid.

Referenced by ReorderBufferCleanupTXN().

◆ ReorderBufferSaveTXNSnapshot()

static void ReorderBufferSaveTXNSnapshot ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id 
)
inlinestatic

Definition at line 2007 of file reorderbuffer.c.

2009 {
2010  txn->command_id = command_id;
2011 
2012  /* Avoid copying if it's already copied. */
2013  if (snapshot_now->copied)
2014  txn->snapshot_now = snapshot_now;
2015  else
2016  txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2017  txn, command_id);
2018 }

References ReorderBufferTXN::command_id, SnapshotData::copied, ReorderBufferCopySnap(), and ReorderBufferTXN::snapshot_now.

Referenced by ReorderBufferProcessTXN(), and ReorderBufferResetTXN().

◆ ReorderBufferSerializeChange()

static void ReorderBufferSerializeChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  fd,
ReorderBufferChange change 
)
static

Definition at line 3708 of file reorderbuffer.c.

3710 {
3711  ReorderBufferDiskChange *ondisk;
3712  Size sz = sizeof(ReorderBufferDiskChange);
3713 
3715 
3716  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3717  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
3718 
3719  switch (change->action)
3720  {
3721  /* fall through these, they're all similar enough */
3726  {
3727  char *data;
3728  ReorderBufferTupleBuf *oldtup,
3729  *newtup;
3730  Size oldlen = 0;
3731  Size newlen = 0;
3732 
3733  oldtup = change->data.tp.oldtuple;
3734  newtup = change->data.tp.newtuple;
3735 
3736  if (oldtup)
3737  {
3738  sz += sizeof(HeapTupleData);
3739  oldlen = oldtup->tuple.t_len;
3740  sz += oldlen;
3741  }
3742 
3743  if (newtup)
3744  {
3745  sz += sizeof(HeapTupleData);
3746  newlen = newtup->tuple.t_len;
3747  sz += newlen;
3748  }
3749 
3750  /* make sure we have enough space */
3752 
3753  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3754  /* might have been reallocated above */
3755  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3756 
3757  if (oldlen)
3758  {
3759  memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
3760  data += sizeof(HeapTupleData);
3761 
3762  memcpy(data, oldtup->tuple.t_data, oldlen);
3763  data += oldlen;
3764  }
3765 
3766  if (newlen)
3767  {
3768  memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
3769  data += sizeof(HeapTupleData);
3770 
3771  memcpy(data, newtup->tuple.t_data, newlen);
3772  data += newlen;
3773  }
3774  break;
3775  }
3777  {
3778  char *data;
3779  Size prefix_size = strlen(change->data.msg.prefix) + 1;
3780 
3781  sz += prefix_size + change->data.msg.message_size +
3782  sizeof(Size) + sizeof(Size);
3784 
3785  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3786 
3787  /* might have been reallocated above */
3788  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3789 
3790  /* write the prefix including the size */
3791  memcpy(data, &prefix_size, sizeof(Size));
3792  data += sizeof(Size);
3793  memcpy(data, change->data.msg.prefix,
3794  prefix_size);
3795  data += prefix_size;
3796 
3797  /* write the message including the size */
3798  memcpy(data, &change->data.msg.message_size, sizeof(Size));
3799  data += sizeof(Size);
3800  memcpy(data, change->data.msg.message,
3801  change->data.msg.message_size);
3802  data += change->data.msg.message_size;
3803 
3804  break;
3805  }
3807  {
3808  char *data;
3809  Size inval_size = sizeof(SharedInvalidationMessage) *
3810  change->data.inval.ninvalidations;
3811 
3812  sz += inval_size;
3813 
3815  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3816 
3817  /* might have been reallocated above */
3818  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3819  memcpy(data, change->data.inval.invalidations, inval_size);
3820  data += inval_size;
3821 
3822  break;
3823  }
3825  {
3826  Snapshot snap;
3827  char *data;
3828 
3829  snap = change->data.snapshot;
3830 
3831  sz += sizeof(SnapshotData) +
3832  sizeof(TransactionId) * snap->xcnt +
3833  sizeof(TransactionId) * snap->subxcnt;
3834 
3835  /* make sure we have enough space */
3837  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3838  /* might have been reallocated above */
3839  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3840 
3841  memcpy(data, snap, sizeof(SnapshotData));
3842  data += sizeof(SnapshotData);
3843 
3844  if (snap->xcnt)
3845  {
3846  memcpy(data, snap->xip,
3847  sizeof(TransactionId) * snap->xcnt);
3848  data += sizeof(TransactionId) * snap->xcnt;
3849  }
3850 
3851  if (snap->subxcnt)
3852  {
3853  memcpy(data, snap->subxip,
3854  sizeof(TransactionId) * snap->subxcnt);
3855  data += sizeof(TransactionId) * snap->subxcnt;
3856  }
3857  break;
3858  }
3860  {
3861  Size size;
3862  char *data;
3863 
3864  /* account for the OIDs of truncated relations */
3865  size = sizeof(Oid) * change->data.truncate.nrelids;
3866  sz += size;
3867 
3868  /* make sure we have enough space */
3870 
3871  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3872  /* might have been reallocated above */
3873  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3874 
3875  memcpy(data, change->data.truncate.relids, size);
3876  data += size;
3877 
3878  break;
3879  }
3884  /* ReorderBufferChange contains everything important */
3885  break;
3886  }
3887 
3888  ondisk->size = sz;
3889 
3890  errno = 0;
3892  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
3893  {
3894  int save_errno = errno;
3895 
3897 
3898  /* if write didn't set errno, assume problem is no disk space */
3899  errno = save_errno ? save_errno : ENOSPC;
3900  ereport(ERROR,
3902  errmsg("could not write to data file for XID %u: %m",
3903  txn->xid)));
3904  }
3906 
3907  /*
3908  * Keep the transaction's final_lsn up to date with each change we send to
3909  * disk, so that ReorderBufferRestoreCleanup works correctly. (We used to
3910  * only do this on commit and abort records, but that doesn't work if a
3911  * system crash leaves a transaction without its abort record).
3912  *
3913  * Make sure not to move it backwards.
3914  */
3915  if (txn->final_lsn < change->lsn)
3916  txn->final_lsn = change->lsn;
3917 
3918  Assert(ondisk->change.action == change->action);
3919 }
#define write(a, b, c)
Definition: win32.h:14
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
@ WAIT_EVENT_REORDER_BUFFER_WRITE
Definition: wait_event.h:201

References ReorderBufferChange::action, Assert(), ReorderBufferDiskChange::change, CloseTransientFile(), ReorderBufferChange::data, data, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferTXN::final_lsn, if(), ReorderBufferChange::inval, ReorderBufferChange::lsn, ReorderBufferChange::msg, ReorderBuffer::outbuf, pgstat_report_wait_end(), pgstat_report_wait_start(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTupleBuf::tuple, WAIT_EVENT_REORDER_BUFFER_WRITE, write, SnapshotData::xcnt, ReorderBufferTXN::xid, and SnapshotData::xip.

Referenced by ReorderBufferSerializeTXN().

◆ ReorderBufferSerializedPath()

static void ReorderBufferSerializedPath ( char *  path,
ReplicationSlot slot,
TransactionId  xid,
XLogSegNo  segno 
)
static

Definition at line 4540 of file reorderbuffer.c.

4542 {
4543  XLogRecPtr recptr;
4544 
4545  XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
4546 
4547  snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.spill",
4549  xid, LSN_FORMAT_ARGS(recptr));
4550 }
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43

References ReplicationSlot::data, LSN_FORMAT_ARGS, MAXPGPATH, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, snprintf, wal_segment_size, and XLogSegNoOffsetToRecPtr.

Referenced by ReorderBufferRestoreChanges(), ReorderBufferRestoreCleanup(), and ReorderBufferSerializeTXN().

◆ ReorderBufferSerializeReserve()

static void ReorderBufferSerializeReserve ( ReorderBuffer rb,
Size  sz 
)
static

Definition at line 3436 of file reorderbuffer.c.

3437 {
3438  if (!rb->outbufsize)
3439  {
3440  rb->outbuf = MemoryContextAlloc(rb->context, sz);
3441  rb->outbufsize = sz;
3442  }
3443  else if (rb->outbufsize < sz)
3444  {
3445  rb->outbuf = repalloc(rb->outbuf, sz);
3446  rb->outbufsize = sz;
3447  }
3448 }

References ReorderBuffer::context, MemoryContextAlloc(), ReorderBuffer::outbuf, ReorderBuffer::outbufsize, and repalloc().

Referenced by ReorderBufferRestoreChanges(), and ReorderBufferSerializeChange().

◆ ReorderBufferSerializeTXN()

static void ReorderBufferSerializeTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 3616 of file reorderbuffer.c.

3617 {
3618  dlist_iter subtxn_i;
3619  dlist_mutable_iter change_i;
3620  int fd = -1;
3621  XLogSegNo curOpenSegNo = 0;
3622  Size spilled = 0;
3623  Size size = txn->size;
3624 
3625  elog(DEBUG2, "spill %u changes in XID %u to disk",
3626  (uint32) txn->nentries_mem, txn->xid);
3627 
3628  /* do the same to all child TXs */
3629  dlist_foreach(subtxn_i, &txn->subtxns)
3630  {
3631  ReorderBufferTXN *subtxn;
3632 
3633  subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
3634  ReorderBufferSerializeTXN(rb, subtxn);
3635  }
3636 
3637  /* serialize changestream */
3638  dlist_foreach_modify(change_i, &txn->changes)
3639  {
3640  ReorderBufferChange *change;
3641 
3642  change = dlist_container(ReorderBufferChange, node, change_i.cur);
3643 
3644  /*
3645  * store in segment in which it belongs by start lsn, don't split over
3646  * multiple segments tho
3647  */
3648  if (fd == -1 ||
3649  !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
3650  {
3651  char path[MAXPGPATH];
3652 
3653  if (fd != -1)
3655 
3656  XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
3657 
3658  /*
3659  * No need to care about TLIs here, only used during a single run,
3660  * so each LSN only maps to a specific WAL record.
3661  */
3663  curOpenSegNo);
3664 
3665  /* open segment, create it if necessary */
3666  fd = OpenTransientFile(path,
3667  O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
3668 
3669  if (fd < 0)
3670  ereport(ERROR,
3672  errmsg("could not open file \"%s\": %m", path)));
3673  }
3674 
3675  ReorderBufferSerializeChange(rb, txn, fd, change);
3676  dlist_delete(&change->node);
3677  ReorderBufferReturnChange(rb, change, true);
3678 
3679  spilled++;
3680  }
3681 
3682  /* update the statistics iff we have spilled anything */
3683  if (spilled)
3684  {
3685  rb->spillCount += 1;
3686  rb->spillBytes += size;
3687 
3688  /* don't consider already serialized transactions */
3689  rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
3690 
3691  /* update the decoding stats */
3693  }
3694 
3695  Assert(spilled == txn->nentries_mem);
3696  Assert(dlist_is_empty(&txn->changes));
3697  txn->nentries_mem = 0;
3699 
3700  if (fd != -1)
3702 }
void UpdateDecodingStats(LogicalDecodingContext *ctx)
Definition: logical.c:1833
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
#define rbtxn_is_serialized_clear(txn)
#define RBTXN_IS_SERIALIZED
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)

References Assert(), ReorderBufferTXN::changes, CloseTransientFile(), dlist_iter::cur, dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_delete(), dlist_foreach, dlist_foreach_modify, dlist_is_empty(), elog(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferChange::lsn, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, OpenTransientFile(), PG_BINARY, ReorderBuffer::private_data, RBTXN_IS_SERIALIZED, rbtxn_is_serialized, rbtxn_is_serialized_clear, ReorderBufferReturnChange(), ReorderBufferSerializeChange(), ReorderBufferSerializedPath(), ReorderBufferTXN::size, ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBufferTXN::subtxns, ReorderBufferTXN::txn_flags, UpdateDecodingStats(), wal_segment_size, ReorderBufferTXN::xid, XLByteInSeg, and XLByteToSeg.

Referenced by ReorderBufferCheckMemoryLimit(), and ReorderBufferIterTXNInit().

◆ ReorderBufferSetBaseSnapshot()

void ReorderBufferSetBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 3097 of file reorderbuffer.c.

3099 {
3100  ReorderBufferTXN *txn;
3101  bool is_new;
3102 
3103  Assert(snap != NULL);
3104 
3105  /*
3106  * Fetch the transaction to operate on. If we know it's a subtransaction,
3107  * operate on its top-level transaction instead.
3108  */
3109  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
3110  if (rbtxn_is_known_subxact(txn))
3111  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3112  NULL, InvalidXLogRecPtr, false);
3113  Assert(txn->base_snapshot == NULL);
3114 
3115  txn->base_snapshot = snap;
3116  txn->base_snapshot_lsn = lsn;
3118 
3119  AssertTXNLsnOrder(rb);
3120 }

References Assert(), AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBufferTXN::base_snapshot_node, dlist_push_tail(), InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), ReorderBufferTXN::toplevel_xid, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by SnapBuildCommitTxn(), and SnapBuildProcessChange().

◆ ReorderBufferSetRestartPoint()

void ReorderBufferSetRestartPoint ( ReorderBuffer rb,
XLogRecPtr  ptr 
)

Definition at line 1048 of file reorderbuffer.c.

1049 {
1050  rb->current_restart_decoding_lsn = ptr;
1051 }

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

◆ ReorderBufferSkipPrepare()

void ReorderBufferSkipPrepare ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 2732 of file reorderbuffer.c.

2733 {
2734  ReorderBufferTXN *txn;
2735 
2736  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2737 
2738  /* unknown transaction, nothing to do */
2739  if (txn == NULL)
2740  return;
2741 
2743 }
#define RBTXN_SKIPPED_PREPARE

References InvalidXLogRecPtr, RBTXN_SKIPPED_PREPARE, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by DecodePrepare().

◆ ReorderBufferStreamCommit()

static void ReorderBufferStreamCommit ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1872 of file reorderbuffer.c.

1873 {
1874  /* we should only call this for previously streamed transactions */
1875  Assert(rbtxn_is_streamed(txn));
1876 
1877  ReorderBufferStreamTXN(rb, txn);
1878 
1879  if (rbtxn_prepared(txn))
1880  {
1881  /*
1882  * Note, we send stream prepare even if a concurrent abort is
1883  * detected. See DecodePrepare for more information.
1884  */
1885  rb->stream_prepare(rb, txn, txn->final_lsn);
1886 
1887  /*
1888  * This is a PREPARED transaction, part of a two-phase commit. The
1889  * full cleanup will happen as part of the COMMIT PREPAREDs, so now
1890  * just truncate txn by removing changes and tuple_cids.
1891  */
1892  ReorderBufferTruncateTXN(rb, txn, true);
1893  /* Reset the CheckXidAlive */
1895  }
1896  else
1897  {
1898  rb->stream_commit(rb, txn, txn->final_lsn);
1899  ReorderBufferCleanupTXN(rb, txn);
1900  }
1901 }
ReorderBufferStreamPrepareCB stream_prepare
ReorderBufferStreamCommitCB stream_commit

References Assert(), CheckXidAlive, ReorderBufferTXN::final_lsn, InvalidTransactionId, rbtxn_is_streamed, rbtxn_prepared, ReorderBufferCleanupTXN(), ReorderBufferStreamTXN(), ReorderBufferTruncateTXN(), ReorderBuffer::stream_commit, and ReorderBuffer::stream_prepare.

Referenced by ReorderBufferReplay().

◆ ReorderBufferStreamTXN()

static void ReorderBufferStreamTXN ( ReorderBuffer rb,
ReorderBufferTXN txn