PostgreSQL Source Code  git master
reorderbuffer.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/detoast.h"
#include "access/heapam.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/combocid.h"
#include "utils/memdebug.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenumbermap.h"
Include dependency graph for reorderbuffer.c:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTXNByIdEnt
 
struct  ReorderBufferTupleCidKey
 
struct  ReorderBufferTupleCidEnt
 
struct  TXNEntryFile
 
struct  ReorderBufferIterTXNEntry
 
struct  ReorderBufferIterTXNState
 
struct  ReorderBufferToastEnt
 
struct  ReorderBufferDiskChange
 
struct  RewriteMappingFile
 

Macros

#define IsSpecInsert(action)
 
#define IsSpecConfirmOrAbort(action)
 
#define IsInsertOrUpdate(action)
 
#define CHANGES_THRESHOLD   100
 

Typedefs

typedef struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
 
typedef struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
 
typedef struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
 
typedef struct TXNEntryFile TXNEntryFile
 
typedef struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
 
typedef struct ReorderBufferIterTXNState ReorderBufferIterTXNState
 
typedef struct ReorderBufferToastEnt ReorderBufferToastEnt
 
typedef struct ReorderBufferDiskChange ReorderBufferDiskChange
 
typedef struct RewriteMappingFile RewriteMappingFile
 

Functions

static ReorderBufferTXNReorderBufferGetTXN (ReorderBuffer *rb)
 
static void ReorderBufferReturnTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferTXNReorderBufferTXNByXid (ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
 
static void ReorderBufferTransferSnapToParent (ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
 
static void AssertTXNLsnOrder (ReorderBuffer *rb)
 
static void ReorderBufferIterTXNInit (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
 
static ReorderBufferChangeReorderBufferIterTXNNext (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferIterTXNFinish (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferExecuteInvalidations (uint32 nmsgs, SharedInvalidationMessage *msgs)
 
static void ReorderBufferCheckMemoryLimit (ReorderBuffer *rb)
 
static void ReorderBufferSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeChange (ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
 
static Size ReorderBufferRestoreChanges (ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
 
static void ReorderBufferRestoreChange (ReorderBuffer *rb, ReorderBufferTXN *txn, char *data)
 
static void ReorderBufferRestoreCleanup (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferTruncateTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
 
static void ReorderBufferCleanupSerializedTXNs (const char *slotname)
 
static void ReorderBufferSerializedPath (char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
 
static void ReorderBufferFreeSnap (ReorderBuffer *rb, Snapshot snap)
 
static Snapshot ReorderBufferCopySnap (ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
 
static bool ReorderBufferCanStream (ReorderBuffer *rb)
 
static bool ReorderBufferCanStartStreaming (ReorderBuffer *rb)
 
static void ReorderBufferStreamTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferStreamCommit (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastInitHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReset (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReplace (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void ReorderBufferToastAppendChunk (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static Size ReorderBufferChangeSize (ReorderBufferChange *change)
 
static void ReorderBufferChangeMemoryUpdate (ReorderBuffer *rb, ReorderBufferChange *change, bool addition, Size sz)
 
ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *rb)
 
void ReorderBufferReturnChange (ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
 
ReorderBufferTupleBufReorderBufferGetTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
 
OidReorderBufferGetRelids (ReorderBuffer *rb, int nrelids)
 
void ReorderBufferReturnRelids (ReorderBuffer *rb, Oid *relids)
 
static void ReorderBufferProcessPartialChange (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
static void AssertChangeLsnOrder (ReorderBufferTXN *txn)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
TransactionId ReorderBufferGetOldestXmin (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
static int ReorderBufferIterCompare (Datum a, Datum b, void *arg)
 
static void ReorderBufferCleanupTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferBuildTupleCidHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void SetupCheckXidLive (TransactionId xid)
 
static void ReorderBufferApplyChange (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyTruncate (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyMessage (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferSaveTXNSnapshot (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
 
static void ReorderBufferResetTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
 
static void ReorderBufferProcessTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)
 
static void ReorderBufferReplay (ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
bool ReorderBufferRememberPrepareInfo (ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferSkipPrepare (ReorderBuffer *rb, TransactionId xid)
 
void ReorderBufferPrepare (ReorderBuffer *rb, TransactionId xid, char *gid)
 
void ReorderBufferFinishPrepared (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, TimestampTz abort_time)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferInvalidate (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileLocator locator, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
TransactionIdReorderBufferGetCatalogChangesXacts (ReorderBuffer *rb)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
static void ReorderBufferSerializeReserve (ReorderBuffer *rb, Size sz)
 
static ReorderBufferTXNReorderBufferLargestTXN (ReorderBuffer *rb)
 
static ReorderBufferTXNReorderBufferLargestStreamableTopTXN (ReorderBuffer *rb)
 
void StartupReorderBuffer (void)
 
static void ApplyLogicalMappingFile (HTAB *tuplecid_data, Oid relid, const char *fname)
 
static bool TransactionIdInArray (TransactionId xid, TransactionId *xip, Size num)
 
static int file_sort_by_lsn (const ListCell *a_p, const ListCell *b_p)
 
static void UpdateLogicalMappings (HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
 
bool ResolveCminCmaxDuringDecoding (HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
 

Variables

int logical_decoding_work_mem
 
static const Size max_changes_in_memory = 4096
 
int debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING_BUFFERED
 

Macro Definition Documentation

◆ CHANGES_THRESHOLD

#define CHANGES_THRESHOLD   100

◆ IsInsertOrUpdate

#define IsInsertOrUpdate (   action)
Value:

Definition at line 190 of file reorderbuffer.c.

◆ IsSpecConfirmOrAbort

#define IsSpecConfirmOrAbort (   action)
Value:
( \
)
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
Definition: reorderbuffer.h:74
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT
Definition: reorderbuffer.h:75

Definition at line 185 of file reorderbuffer.c.

◆ IsSpecInsert

#define IsSpecInsert (   action)
Value:

Definition at line 181 of file reorderbuffer.c.

Typedef Documentation

◆ ReorderBufferDiskChange

◆ ReorderBufferIterTXNEntry

◆ ReorderBufferIterTXNState

◆ ReorderBufferToastEnt

◆ ReorderBufferTupleCidEnt

◆ ReorderBufferTupleCidKey

◆ ReorderBufferTXNByIdEnt

◆ RewriteMappingFile

◆ TXNEntryFile

typedef struct TXNEntryFile TXNEntryFile

Function Documentation

◆ ApplyLogicalMappingFile()

static void ApplyLogicalMappingFile ( HTAB tuplecid_data,
Oid  relid,
const char *  fname 
)
static

Definition at line 5018 of file reorderbuffer.c.

5019 {
5020  char path[MAXPGPATH];
5021  int fd;
5022  int readBytes;
5024 
5025  sprintf(path, "pg_logical/mappings/%s", fname);
5026  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
5027  if (fd < 0)
5028  ereport(ERROR,
5030  errmsg("could not open file \"%s\": %m", path)));
5031 
5032  while (true)
5033  {
5036  ReorderBufferTupleCidEnt *new_ent;
5037  bool found;
5038 
5039  /* be careful about padding */
5040  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
5041 
5042  /* read all mappings till the end of the file */
5043  pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ);
5044  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
5046 
5047  if (readBytes < 0)
5048  ereport(ERROR,
5050  errmsg("could not read file \"%s\": %m",
5051  path)));
5052  else if (readBytes == 0) /* EOF */
5053  break;
5054  else if (readBytes != sizeof(LogicalRewriteMappingData))
5055  ereport(ERROR,
5057  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
5058  path, readBytes,
5059  (int32) sizeof(LogicalRewriteMappingData))));
5060 
5061  key.rlocator = map.old_locator;
5062  ItemPointerCopy(&map.old_tid,
5063  &key.tid);
5064 
5065 
5066  ent = (ReorderBufferTupleCidEnt *)
5068 
5069  /* no existing mapping, no need to update */
5070  if (!ent)
5071  continue;
5072 
5073  key.rlocator = map.new_locator;
5074  ItemPointerCopy(&map.new_tid,
5075  &key.tid);
5076 
5077  new_ent = (ReorderBufferTupleCidEnt *)
5079 
5080  if (found)
5081  {
5082  /*
5083  * Make sure the existing mapping makes sense. We sometime update
5084  * old records that did not yet have a cmax (e.g. pg_class' own
5085  * entry while rewriting it) during rewrites, so allow that.
5086  */
5087  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
5088  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
5089  }
5090  else
5091  {
5092  /* update mapping */
5093  new_ent->cmin = ent->cmin;
5094  new_ent->cmax = ent->cmax;
5095  new_ent->combocid = ent->combocid;
5096  }
5097  }
5098 
5099  if (CloseTransientFile(fd) != 0)
5100  ereport(ERROR,
5102  errmsg("could not close file \"%s\": %m", path)));
5103 }
#define InvalidCommandId
Definition: c.h:658
signed int int32
Definition: c.h:483
#define PG_BINARY
Definition: c.h:1283
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:953
int errcode_for_file_access(void)
Definition: elog.c:881
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
int CloseTransientFile(int fd)
Definition: fd.c:2754
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2578
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_ENTER
Definition: hsearch.h:114
#define read(a, b, c)
Definition: win32.h:13
static void ItemPointerCopy(const ItemPointerData *fromPointer, ItemPointerData *toPointer)
Definition: itemptr.h:172
Assert(fmt[strlen(fmt) - 1] !='\n')
#define MAXPGPATH
#define sprintf
Definition: port.h:240
static int fd(const char *x, int i)
Definition: preproc-init.c:105
static HTAB * tuplecid_data
Definition: snapmgr.c:108
ItemPointerData new_tid
Definition: rewriteheap.h:40
RelFileLocator old_locator
Definition: rewriteheap.h:37
ItemPointerData old_tid
Definition: rewriteheap.h:39
RelFileLocator new_locator
Definition: rewriteheap.h:38
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:88
static void pgstat_report_wait_end(void)
Definition: wait_event.h:104

References Assert(), CloseTransientFile(), ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), HASH_ENTER, HASH_FIND, hash_search(), InvalidCommandId, ItemPointerCopy(), sort-test::key, MAXPGPATH, LogicalRewriteMappingData::new_locator, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_locator, LogicalRewriteMappingData::old_tid, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, sprintf, and tuplecid_data.

Referenced by UpdateLogicalMappings().

◆ AssertChangeLsnOrder()

static void AssertChangeLsnOrder ( ReorderBufferTXN txn)
static

Definition at line 978 of file reorderbuffer.c.

979 {
980 #ifdef USE_ASSERT_CHECKING
981  dlist_iter iter;
982  XLogRecPtr prev_lsn = txn->first_lsn;
983 
984  dlist_foreach(iter, &txn->changes)
985  {
986  ReorderBufferChange *cur_change;
987 
988  cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
989 
991  Assert(cur_change->lsn != InvalidXLogRecPtr);
992  Assert(txn->first_lsn <= cur_change->lsn);
993 
994  if (txn->end_lsn != InvalidXLogRecPtr)
995  Assert(cur_change->lsn <= txn->end_lsn);
996 
997  Assert(prev_lsn <= cur_change->lsn);
998 
999  prev_lsn = cur_change->lsn;
1000  }
1001 #endif
1002 }
#define dlist_foreach(iter, lhead)
Definition: ilist.h:623
#define dlist_container(type, membername, ptr)
Definition: ilist.h:593
XLogRecPtr first_lsn
XLogRecPtr end_lsn
dlist_head changes
dlist_node * cur
Definition: ilist.h:179
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert(), ReorderBufferTXN::changes, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, and ReorderBufferChange::lsn.

Referenced by ReorderBufferIterTXNInit().

◆ AssertTXNLsnOrder()

static void AssertTXNLsnOrder ( ReorderBuffer rb)
static

Definition at line 907 of file reorderbuffer.c.

908 {
909 #ifdef USE_ASSERT_CHECKING
911  dlist_iter iter;
912  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
913  XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
914 
915  /*
916  * Skip the verification if we don't reach the LSN at which we start
917  * decoding the contents of transactions yet because until we reach the
918  * LSN, we could have transactions that don't have the association between
919  * the top-level transaction and subtransaction yet and consequently have
920  * the same LSN. We don't guarantee this association until we try to
921  * decode the actual contents of transaction. The ordering of the records
922  * prior to the start_decoding_at LSN should have been checked before the
923  * restart.
924  */
926  return;
927 
928  dlist_foreach(iter, &rb->toplevel_by_lsn)
929  {
931  iter.cur);
932 
933  /* start LSN must be set */
934  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
935 
936  /* If there is an end LSN, it must be higher than start LSN */
937  if (cur_txn->end_lsn != InvalidXLogRecPtr)
938  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
939 
940  /* Current initial LSN must be strictly higher than previous */
941  if (prev_first_lsn != InvalidXLogRecPtr)
942  Assert(prev_first_lsn < cur_txn->first_lsn);
943 
944  /* known-as-subtxn txns must not be listed */
945  Assert(!rbtxn_is_known_subxact(cur_txn));
946 
947  prev_first_lsn = cur_txn->first_lsn;
948  }
949 
951  {
953  base_snapshot_node,
954  iter.cur);
955 
956  /* base snapshot (and its LSN) must be set */
957  Assert(cur_txn->base_snapshot != NULL);
959 
960  /* current LSN must be strictly higher than previous */
961  if (prev_base_snap_lsn != InvalidXLogRecPtr)
962  Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
963 
964  /* known-as-subtxn txns must not be listed */
965  Assert(!rbtxn_is_known_subxact(cur_txn));
966 
967  prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
968  }
969 #endif
970 }
#define rbtxn_is_known_subxact(txn)
bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
Definition: snapbuild.c:434
XLogReaderState * reader
Definition: logical.h:42
struct SnapBuild * snapshot_builder
Definition: logical.h:44
XLogRecPtr base_snapshot_lsn
Snapshot base_snapshot
dlist_head txns_by_base_snapshot_lsn
dlist_head toplevel_by_lsn
void * private_data
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, XLogReaderState::EndRecPtr, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, ReorderBuffer::private_data, rbtxn_is_known_subxact, LogicalDecodingContext::reader, SnapBuildXactNeedsSkip(), LogicalDecodingContext::snapshot_builder, ReorderBuffer::toplevel_by_lsn, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferAssignChild(), ReorderBufferGetOldestTXN(), ReorderBufferGetOldestXmin(), ReorderBufferSetBaseSnapshot(), and ReorderBufferTXNByXid().

◆ file_sort_by_lsn()

static int file_sort_by_lsn ( const ListCell a_p,
const ListCell b_p 
)
static

Definition at line 5120 of file reorderbuffer.c.

5121 {
5124 
5125  if (a->lsn < b->lsn)
5126  return -1;
5127  else if (a->lsn > b->lsn)
5128  return 1;
5129  return 0;
5130 }
int b
Definition: isn.c:70
int a
Definition: isn.c:69
#define lfirst(lc)
Definition: pg_list.h:172

References a, b, and lfirst.

Referenced by UpdateLogicalMappings().

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
TimestampTz  abort_time 
)

Definition at line 2891 of file reorderbuffer.c.

2893 {
2894  ReorderBufferTXN *txn;
2895 
2896  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2897  false);
2898 
2899  /* unknown, nothing to remove */
2900  if (txn == NULL)
2901  return;
2902 
2903  txn->xact_time.abort_time = abort_time;
2904 
2905  /* For streamed transactions notify the remote node about the abort. */
2906  if (rbtxn_is_streamed(txn))
2907  {
2908  rb->stream_abort(rb, txn, lsn);
2909 
2910  /*
2911  * We might have decoded changes for this transaction that could load
2912  * the cache as per the current transaction's view (consider DDL's
2913  * happened in this transaction). We don't want the decoding of future
2914  * transactions to use those cache entries so execute invalidations.
2915  */
2916  if (txn->ninvalidations > 0)
2918  txn->invalidations);
2919  }
2920 
2921  /* cosmetic... */
2922  txn->final_lsn = lsn;
2923 
2924  /* remove potential on-disk data, and deallocate */
2925  ReorderBufferCleanupTXN(rb, txn);
2926 }
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_streamed(txn)
SharedInvalidationMessage * invalidations
TimestampTz abort_time
XLogRecPtr final_lsn
union ReorderBufferTXN::@106 xact_time
ReorderBufferStreamAbortCB stream_abort

References ReorderBufferTXN::abort_time, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), ReorderBuffer::stream_abort, and ReorderBufferTXN::xact_time.

Referenced by DecodeAbort().

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)

Definition at line 2936 of file reorderbuffer.c.

2937 {
2938  dlist_mutable_iter it;
2939 
2940  /*
2941  * Iterate through all (potential) toplevel TXNs and abort all that are
2942  * older than what possibly can be running. Once we've found the first
2943  * that is alive we stop, there might be some that acquired an xid earlier
2944  * but started writing later, but it's unlikely and they will be cleaned
2945  * up in a later call to this function.
2946  */
2948  {
2949  ReorderBufferTXN *txn;
2950 
2951  txn = dlist_container(ReorderBufferTXN, node, it.cur);
2952 
2953  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2954  {
2955  elog(DEBUG2, "aborting old transaction %u", txn->xid);
2956 
2957  /* Notify the remote node about the crash/immediate restart. */
2958  if (rbtxn_is_streamed(txn))
2959  rb->stream_abort(rb, txn, InvalidXLogRecPtr);
2960 
2961  /* remove potential on-disk data, and deallocate this tx */
2962  ReorderBufferCleanupTXN(rb, txn);
2963  }
2964  else
2965  return;
2966  }
2967 }
#define DEBUG2
Definition: elog.h:29
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:640
TransactionId xid
dlist_node * cur
Definition: ilist.h:200
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280

References dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, elog(), InvalidXLogRecPtr, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBuffer::stream_abort, ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), and ReorderBufferTXN::xid.

Referenced by standby_decode().

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 3269 of file reorderbuffer.c.

3272 {
3273  ReorderBufferTXN *txn;
3274  MemoryContext oldcontext;
3275  ReorderBufferChange *change;
3276 
3277  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3278 
3279  oldcontext = MemoryContextSwitchTo(rb->context);
3280 
3281  /*
3282  * Collect all the invalidations under the top transaction, if available,
3283  * so that we can execute them all together. See comments atop this
3284  * function.
3285  */
3286  txn = rbtxn_get_toptxn(txn);
3287 
3288  Assert(nmsgs > 0);
3289 
3290  /* Accumulate invalidations. */
3291  if (txn->ninvalidations == 0)
3292  {
3293  txn->ninvalidations = nmsgs;
3295  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3296  memcpy(txn->invalidations, msgs,
3297  sizeof(SharedInvalidationMessage) * nmsgs);
3298  }
3299  else
3300  {
3303  (txn->ninvalidations + nmsgs));
3304 
3305  memcpy(txn->invalidations + txn->ninvalidations, msgs,
3306  nmsgs * sizeof(SharedInvalidationMessage));
3307  txn->ninvalidations += nmsgs;
3308  }
3309 
3310  change = ReorderBufferGetChange(rb);
3312  change->data.inval.ninvalidations = nmsgs;
3313  change->data.inval.invalidations = (SharedInvalidationMessage *)
3314  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3315  memcpy(change->data.inval.invalidations, msgs,
3316  sizeof(SharedInvalidationMessage) * nmsgs);
3317 
3318  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3319 
3320  MemoryContextSwitchTo(oldcontext);
3321 }
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1476
void * palloc(Size size)
Definition: mcxt.c:1226
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:138
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
#define rbtxn_get_toptxn(txn)
@ REORDER_BUFFER_CHANGE_INVALIDATION
Definition: reorderbuffer.h:69
ReorderBufferChangeType action
Definition: reorderbuffer.h:94
union ReorderBufferChange::@100 data
struct ReorderBufferChange::@100::@105 inval
MemoryContext context

References ReorderBufferChange::action, Assert(), ReorderBuffer::context, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferTXN::invalidations, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, palloc(), rbtxn_get_toptxn, REORDER_BUFFER_CHANGE_INVALIDATION, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), and repalloc().

Referenced by xact_decode().

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileLocator  locator,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 3232 of file reorderbuffer.c.

3236 {
3238  ReorderBufferTXN *txn;
3239 
3240  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3241 
3242  change->data.tuplecid.locator = locator;
3243  change->data.tuplecid.tid = tid;
3244  change->data.tuplecid.cmin = cmin;
3245  change->data.tuplecid.cmax = cmax;
3246  change->data.tuplecid.combocid = combocid;
3247  change->lsn = lsn;
3248  change->txn = txn;
3250 
3251  dlist_push_tail(&txn->tuplecids, &change->node);
3252  txn->ntuplecids++;
3253 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:364
@ REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
Definition: reorderbuffer.h:72
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:97
struct ReorderBufferChange::@100::@104 tuplecid
dlist_head tuplecids

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, and ReorderBufferChange::txn.

Referenced by SnapBuildProcessNewCid().

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

◆ ReorderBufferAllocate()

ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 304 of file reorderbuffer.c.

305 {
306  ReorderBuffer *buffer;
307  HASHCTL hash_ctl;
308  MemoryContext new_ctx;
309 
310  Assert(MyReplicationSlot != NULL);
311 
312  /* allocate memory in own context, to have better accountability */
314  "ReorderBuffer",
316 
317  buffer =
318  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
319 
320  memset(&hash_ctl, 0, sizeof(hash_ctl));
321 
322  buffer->context = new_ctx;
323 
324  buffer->change_context = SlabContextCreate(new_ctx,
325  "Change",
327  sizeof(ReorderBufferChange));
328 
329  buffer->txn_context = SlabContextCreate(new_ctx,
330  "TXN",
332  sizeof(ReorderBufferTXN));
333 
334  /*
335  * XXX the allocation sizes used below pre-date generation context's block
336  * growing code. These values should likely be benchmarked and set to
337  * more suitable values.
338  */
339  buffer->tup_context = GenerationContextCreate(new_ctx,
340  "Tuples",
344 
345  hash_ctl.keysize = sizeof(TransactionId);
346  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
347  hash_ctl.hcxt = buffer->context;
348 
349  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
351 
353  buffer->by_txn_last_txn = NULL;
354 
355  buffer->outbuf = NULL;
356  buffer->outbufsize = 0;
357  buffer->size = 0;
358 
359  buffer->spillTxns = 0;
360  buffer->spillCount = 0;
361  buffer->spillBytes = 0;
362  buffer->streamTxns = 0;
363  buffer->streamCount = 0;
364  buffer->streamBytes = 0;
365  buffer->totalTxns = 0;
366  buffer->totalBytes = 0;
367 
369 
370  dlist_init(&buffer->toplevel_by_lsn);
372  dclist_init(&buffer->catchange_txns);
373 
374  /*
375  * Ensure there's no stale data from prior uses of this slot, in case some
376  * prior exit avoided calling ReorderBufferFree. Failure to do this can
377  * produce duplicated txns, and it's very cheap if there's nothing there.
378  */
380 
381  return buffer;
382 }
#define NameStr(name)
Definition: c.h:735
uint32 TransactionId
Definition: c.h:641
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:350
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: generation.c:157
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
static void dclist_init(dclist_head *head)
Definition: ilist.h:671
MemoryContext CurrentMemoryContext
Definition: mcxt.c:135
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1021
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:182
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:183
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:322
ReplicationSlot * MyReplicationSlot
Definition: slot.c:99
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
MemoryContext hcxt
Definition: hsearch.h:86
dclist_head catchange_txns
MemoryContext change_context
ReorderBufferTXN * by_txn_last_txn
TransactionId by_txn_last_xid
MemoryContext tup_context
MemoryContext txn_context
XLogRecPtr current_restart_decoding_lsn
ReplicationSlotPersistentData data
Definition: slot.h:162
#define InvalidTransactionId
Definition: transam.h:31

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert(), ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::catchange_txns, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dclist_init(), dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, ReorderBufferCleanupSerializedTXNs(), ReorderBuffer::size, SLAB_DEFAULT_BLOCK_SIZE, SLAB_LARGE_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBuffer::toplevel_by_lsn, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBuffer::tup_context, ReorderBuffer::txn_context, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

◆ ReorderBufferApplyChange()

static void ReorderBufferApplyChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1957 of file reorderbuffer.c.

1960 {
1961  if (streaming)
1962  rb->stream_change(rb, txn, relation, change);
1963  else
1964  rb->apply_change(rb, txn, relation, change);
1965 }
ReorderBufferStreamChangeCB stream_change
ReorderBufferApplyChangeCB apply_change

References ReorderBuffer::apply_change, and ReorderBuffer::stream_change.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferApplyMessage()

static void ReorderBufferApplyMessage ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1985 of file reorderbuffer.c.

1987 {
1988  if (streaming)
1989  rb->stream_message(rb, txn, change->lsn, true,
1990  change->data.msg.prefix,
1991  change->data.msg.message_size,
1992  change->data.msg.message);
1993  else
1994  rb->message(rb, txn, change->lsn, true,
1995  change->data.msg.prefix,
1996  change->data.msg.message_size,
1997  change->data.msg.message);
1998 }
struct ReorderBufferChange::@100::@103 msg
ReorderBufferStreamMessageCB stream_message
ReorderBufferMessageCB message

References ReorderBufferChange::data, ReorderBufferChange::lsn, ReorderBuffer::message, ReorderBufferChange::msg, and ReorderBuffer::stream_message.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferApplyTruncate()

static void ReorderBufferApplyTruncate ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  nrelations,
Relation relations,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1971 of file reorderbuffer.c.

1974 {
1975  if (streaming)
1976  rb->stream_truncate(rb, txn, nrelations, relations, change);
1977  else
1978  rb->apply_truncate(rb, txn, nrelations, relations, change);
1979 }
ReorderBufferStreamTruncateCB stream_truncate
ReorderBufferApplyTruncateCB apply_truncate

References ReorderBuffer::apply_truncate, and ReorderBuffer::stream_truncate.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 1064 of file reorderbuffer.c.

1066 {
1067  ReorderBufferTXN *txn;
1068  ReorderBufferTXN *subtxn;
1069  bool new_top;
1070  bool new_sub;
1071 
1072  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1073  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1074 
1075  if (!new_sub)
1076  {
1077  if (rbtxn_is_known_subxact(subtxn))
1078  {
1079  /* already associated, nothing to do */
1080  return;
1081  }
1082  else
1083  {
1084  /*
1085  * We already saw this transaction, but initially added it to the
1086  * list of top-level txns. Now that we know it's not top-level,
1087  * remove it from there.
1088  */
1089  dlist_delete(&subtxn->node);
1090  }
1091  }
1092 
1093  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1094  subtxn->toplevel_xid = xid;
1095  Assert(subtxn->nsubtxns == 0);
1096 
1097  /* set the reference to top-level transaction */
1098  subtxn->toptxn = txn;
1099 
1100  /* add to subtransaction list */
1101  dlist_push_tail(&txn->subtxns, &subtxn->node);
1102  txn->nsubtxns++;
1103 
1104  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1105  ReorderBufferTransferSnapToParent(txn, subtxn);
1106 
1107  /* Verify LSN-ordering invariant */
1108  AssertTXNLsnOrder(rb);
1109 }
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
#define RBTXN_IS_SUBXACT
TransactionId toplevel_xid
struct ReorderBufferTXN * toptxn
dlist_head subtxns

References Assert(), AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SUBXACT, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, ReorderBufferTXN::toplevel_xid, ReorderBufferTXN::toptxn, and ReorderBufferTXN::txn_flags.

Referenced by LogicalDecodingProcessRecord(), and ReorderBufferCommitChild().

◆ ReorderBufferBuildTupleCidHash()

static void ReorderBufferBuildTupleCidHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1723 of file reorderbuffer.c.

1724 {
1725  dlist_iter iter;
1726  HASHCTL hash_ctl;
1727 
1729  return;
1730 
1731  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1732  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1733  hash_ctl.hcxt = rb->context;
1734 
1735  /*
1736  * create the hash with the exact number of to-be-stored tuplecids from
1737  * the start
1738  */
1739  txn->tuplecid_hash =
1740  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1742 
1743  dlist_foreach(iter, &txn->tuplecids)
1744  {
1747  bool found;
1748  ReorderBufferChange *change;
1749 
1750  change = dlist_container(ReorderBufferChange, node, iter.cur);
1751 
1753 
1754  /* be careful about padding */
1755  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1756 
1757  key.rlocator = change->data.tuplecid.locator;
1758 
1759  ItemPointerCopy(&change->data.tuplecid.tid,
1760  &key.tid);
1761 
1762  ent = (ReorderBufferTupleCidEnt *)
1763  hash_search(txn->tuplecid_hash, &key, HASH_ENTER, &found);
1764  if (!found)
1765  {
1766  ent->cmin = change->data.tuplecid.cmin;
1767  ent->cmax = change->data.tuplecid.cmax;
1768  ent->combocid = change->data.tuplecid.combocid;
1769  }
1770  else
1771  {
1772  /*
1773  * Maybe we already saw this tuple before in this transaction, but
1774  * if so it must have the same cmin.
1775  */
1776  Assert(ent->cmin == change->data.tuplecid.cmin);
1777 
1778  /*
1779  * cmax may be initially invalid, but once set it can only grow,
1780  * and never become invalid again.
1781  */
1782  Assert((ent->cmax == InvalidCommandId) ||
1783  ((change->data.tuplecid.cmax != InvalidCommandId) &&
1784  (change->data.tuplecid.cmax > ent->cmax)));
1785  ent->cmax = change->data.tuplecid.cmax;
1786  }
1787  }
1788 }
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
#define rbtxn_has_catalog_changes(txn)

References ReorderBufferChange::action, Assert(), ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, dlist_container, dlist_foreach, dlist_is_empty(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), HASHCTL::hcxt, InvalidCommandId, ItemPointerCopy(), sort-test::key, HASHCTL::keysize, ReorderBufferTXN::ntuplecids, rbtxn_has_catalog_changes, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::tuplecids.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferCanStartStreaming()

static bool ReorderBufferCanStartStreaming ( ReorderBuffer rb)
inlinestatic

Definition at line 3966 of file reorderbuffer.c.

3967 {
3969  SnapBuild *builder = ctx->snapshot_builder;
3970 
3971  /* We can't start streaming unless a consistent state is reached. */
3973  return false;
3974 
3975  /*
3976  * We can't start streaming immediately even if the streaming is enabled
3977  * because we previously decoded this transaction and now just are
3978  * restarting.
3979  */
3980  if (ReorderBufferCanStream(rb) &&
3981  !SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr))
3982  return true;
3983 
3984  return false;
3985 }
static bool ReorderBufferCanStream(ReorderBuffer *rb)
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
Definition: snapbuild.c:407
@ SNAPBUILD_CONSISTENT
Definition: snapbuild.h:46
XLogRecPtr ReadRecPtr
Definition: xlogreader.h:206

References ReorderBuffer::private_data, LogicalDecodingContext::reader, XLogReaderState::ReadRecPtr, ReorderBufferCanStream(), SNAPBUILD_CONSISTENT, SnapBuildCurrentState(), SnapBuildXactNeedsSkip(), and LogicalDecodingContext::snapshot_builder.

Referenced by ReorderBufferCheckMemoryLimit(), and ReorderBufferProcessPartialChange().

◆ ReorderBufferCanStream()

static bool ReorderBufferCanStream ( ReorderBuffer rb)
inlinestatic

Definition at line 3957 of file reorderbuffer.c.

3958 {
3960 
3961  return ctx->streaming;
3962 }

References ReorderBuffer::private_data, and LogicalDecodingContext::streaming.

Referenced by ReorderBufferCanStartStreaming(), and ReorderBufferProcessPartialChange().

◆ ReorderBufferChangeMemoryUpdate()

static void ReorderBufferChangeMemoryUpdate ( ReorderBuffer rb,
ReorderBufferChange change,
bool  addition,
Size  sz 
)
static

Definition at line 3178 of file reorderbuffer.c.

3181 {
3182  ReorderBufferTXN *txn;
3183  ReorderBufferTXN *toptxn;
3184 
3185  Assert(change->txn);
3186 
3187  /*
3188  * Ignore tuple CID changes, because those are not evicted when reaching
3189  * memory limit. So we just don't count them, because it might easily
3190  * trigger a pointless attempt to spill.
3191  */
3193  return;
3194 
3195  txn = change->txn;
3196 
3197  /*
3198  * Update the total size in top level as well. This is later used to
3199  * compute the decoding stats.
3200  */
3201  toptxn = rbtxn_get_toptxn(txn);
3202 
3203  if (addition)
3204  {
3205  txn->size += sz;
3206  rb->size += sz;
3207 
3208  /* Update the total size in the top transaction. */
3209  toptxn->total_size += sz;
3210  }
3211  else
3212  {
3213  Assert((rb->size >= sz) && (txn->size >= sz));
3214  txn->size -= sz;
3215  rb->size -= sz;
3216 
3217  /* Update the total size in the top transaction. */
3218  toptxn->total_size -= sz;
3219  }
3220 
3221  Assert(txn->size <= rb->size);
3222 }

References ReorderBufferChange::action, Assert(), rbtxn_get_toptxn, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferTXN::size, ReorderBuffer::size, ReorderBufferTXN::total_size, and ReorderBufferChange::txn.

Referenced by ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferReturnChange(), and ReorderBufferToastReplace().

◆ ReorderBufferChangeSize()

static Size ReorderBufferChangeSize ( ReorderBufferChange change)
static

Definition at line 4109 of file reorderbuffer.c.

4110 {
4111  Size sz = sizeof(ReorderBufferChange);
4112 
4113  switch (change->action)
4114  {
4115  /* fall through these, they're all similar enough */
4120  {
4121  ReorderBufferTupleBuf *oldtup,
4122  *newtup;
4123  Size oldlen = 0;
4124  Size newlen = 0;
4125 
4126  oldtup = change->data.tp.oldtuple;
4127  newtup = change->data.tp.newtuple;
4128 
4129  if (oldtup)
4130  {
4131  sz += sizeof(HeapTupleData);
4132  oldlen = oldtup->tuple.t_len;
4133  sz += oldlen;
4134  }
4135 
4136  if (newtup)
4137  {
4138  sz += sizeof(HeapTupleData);
4139  newlen = newtup->tuple.t_len;
4140  sz += newlen;
4141  }
4142 
4143  break;
4144  }
4146  {
4147  Size prefix_size = strlen(change->data.msg.prefix) + 1;
4148 
4149  sz += prefix_size + change->data.msg.message_size +
4150  sizeof(Size) + sizeof(Size);
4151 
4152  break;
4153  }
4155  {
4156  sz += sizeof(SharedInvalidationMessage) *
4157  change->data.inval.ninvalidations;
4158  break;
4159  }
4161  {
4162  Snapshot snap;
4163 
4164  snap = change->data.snapshot;
4165 
4166  sz += sizeof(SnapshotData) +
4167  sizeof(TransactionId) * snap->xcnt +
4168  sizeof(TransactionId) * snap->subxcnt;
4169 
4170  break;
4171  }
4173  {
4174  sz += sizeof(Oid) * change->data.truncate.nrelids;
4175 
4176  break;
4177  }
4182  /* ReorderBufferChange contains everything important */
4183  break;
4184  }
4185 
4186  return sz;
4187 }
size_t Size
Definition: c.h:594
struct HeapTupleData HeapTupleData
unsigned int Oid
Definition: postgres_ext.h:31
struct ReorderBufferChange ReorderBufferChange
@ REORDER_BUFFER_CHANGE_MESSAGE
Definition: reorderbuffer.h:68
@ REORDER_BUFFER_CHANGE_TRUNCATE
Definition: reorderbuffer.h:76
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:67
struct SnapshotData SnapshotData
uint32 t_len
Definition: htup.h:64
struct ReorderBufferChange::@100::@101 tp
struct ReorderBufferChange::@100::@102 truncate
HeapTupleData tuple
Definition: reorderbuffer.h:38
int32 subxcnt
Definition: snapshot.h:181
uint32 xcnt
Definition: snapshot.h:169

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::msg, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::snapshot, SnapshotData::subxcnt, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTupleBuf::tuple, and SnapshotData::xcnt.

Referenced by ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferReturnChange(), and ReorderBufferToastReplace().

◆ ReorderBufferCheckMemoryLimit()

static void ReorderBufferCheckMemoryLimit ( ReorderBuffer rb)
static

Definition at line 3578 of file reorderbuffer.c.

3579 {
3580  ReorderBufferTXN *txn;
3581 
3582  /*
3583  * Bail out if debug_logical_replication_streaming is buffered and we
3584  * haven't exceeded the memory limit.
3585  */
3587  rb->size < logical_decoding_work_mem * 1024L)
3588  return;
3589 
3590  /*
3591  * If debug_logical_replication_streaming is immediate, loop until there's
3592  * no change. Otherwise, loop until we reach under the memory limit. One
3593  * might think that just by evicting the largest (sub)transaction we will
3594  * come under the memory limit based on assumption that the selected
3595  * transaction is at least as large as the most recent change (which
3596  * caused us to go over the memory limit). However, that is not true
3597  * because a user can reduce the logical_decoding_work_mem to a smaller
3598  * value before the most recent change.
3599  */
3600  while (rb->size >= logical_decoding_work_mem * 1024L ||
3602  rb->size > 0))
3603  {
3604  /*
3605  * Pick the largest transaction and evict it from memory by streaming,
3606  * if possible. Otherwise, spill to disk.
3607  */
3609  (txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
3610  {
3611  /* we know there has to be one, because the size is not zero */
3612  Assert(txn && rbtxn_is_toptxn(txn));
3613  Assert(txn->total_size > 0);
3614  Assert(rb->size >= txn->total_size);
3615 
3616  ReorderBufferStreamTXN(rb, txn);
3617  }
3618  else
3619  {
3620  /*
3621  * Pick the largest transaction (or subtransaction) and evict it
3622  * from memory by serializing it to disk.
3623  */
3624  txn = ReorderBufferLargestTXN(rb);
3625 
3626  /* we know there has to be one, because the size is not zero */
3627  Assert(txn);
3628  Assert(txn->size > 0);
3629  Assert(rb->size >= txn->size);
3630 
3631  ReorderBufferSerializeTXN(rb, txn);
3632  }
3633 
3634  /*
3635  * After eviction, the transaction should have no entries in memory,
3636  * and should use 0 bytes for changes.
3637  */
3638  Assert(txn->size == 0);
3639  Assert(txn->nentries_mem == 0);
3640  }
3641 
3642  /* We must be under the memory limit now. */
3643  Assert(rb->size < logical_decoding_work_mem * 1024L);
3644 }
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)
static ReorderBufferTXN * ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
int logical_decoding_work_mem
int debug_logical_replication_streaming
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
Definition: reorderbuffer.h:28
@ DEBUG_LOGICAL_REP_STREAMING_BUFFERED
Definition: reorderbuffer.h:27
#define rbtxn_is_toptxn(txn)

References Assert(), DEBUG_LOGICAL_REP_STREAMING_BUFFERED, DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE, debug_logical_replication_streaming, logical_decoding_work_mem, ReorderBufferTXN::nentries_mem, rbtxn_is_toptxn, ReorderBufferCanStartStreaming(), ReorderBufferLargestStreamableTopTXN(), ReorderBufferLargestTXN(), ReorderBufferSerializeTXN(), ReorderBufferStreamTXN(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBufferTXN::total_size.

Referenced by ReorderBufferQueueChange().

◆ ReorderBufferCleanupSerializedTXNs()

static void ReorderBufferCleanupSerializedTXNs ( const char *  slotname)
static

Definition at line 4535 of file reorderbuffer.c.

4536 {
4537  DIR *spill_dir;
4538  struct dirent *spill_de;
4539  struct stat statbuf;
4540  char path[MAXPGPATH * 2 + 12];
4541 
4542  sprintf(path, "pg_replslot/%s", slotname);
4543 
4544  /* we're only handling directories here, skip if it's not ours */
4545  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
4546  return;
4547 
4548  spill_dir = AllocateDir(path);
4549  while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
4550  {
4551  /* only look at names that can be ours */
4552  if (strncmp(spill_de->d_name, "xid", 3) == 0)
4553  {
4554  snprintf(path, sizeof(path),
4555  "pg_replslot/%s/%s", slotname,
4556  spill_de->d_name);
4557 
4558  if (unlink(path) != 0)
4559  ereport(ERROR,
4561  errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
4562  path, slotname)));
4563  }
4564  }
4565  FreeDir(spill_dir);
4566 }
#define INFO
Definition: elog.h:34
int FreeDir(DIR *dir)
Definition: fd.c:2906
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2869
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2788
#define snprintf
Definition: port.h:238
Definition: dirent.c:26
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15
#define lstat(path, sb)
Definition: win32_port.h:285
#define S_ISDIR(m)
Definition: win32_port.h:325

References AllocateDir(), dirent::d_name, ereport, errcode_for_file_access(), errmsg(), ERROR, FreeDir(), INFO, lstat, MAXPGPATH, ReadDirExtended(), S_ISDIR, snprintf, sprintf, and stat::st_mode.

Referenced by ReorderBufferAllocate(), ReorderBufferFree(), and StartupReorderBuffer().

◆ ReorderBufferCleanupTXN()

static void ReorderBufferCleanupTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1500 of file reorderbuffer.c.

1501 {
1502  bool found;
1503  dlist_mutable_iter iter;
1504 
1505  /* cleanup subtransactions & their changes */
1506  dlist_foreach_modify(iter, &txn->subtxns)
1507  {
1508  ReorderBufferTXN *subtxn;
1509 
1510  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1511 
1512  /*
1513  * Subtransactions are always associated to the toplevel TXN, even if
1514  * they originally were happening inside another subtxn, so we won't
1515  * ever recurse more than one level deep here.
1516  */
1517  Assert(rbtxn_is_known_subxact(subtxn));
1518  Assert(subtxn->nsubtxns == 0);
1519 
1520  ReorderBufferCleanupTXN(rb, subtxn);
1521  }
1522 
1523  /* cleanup changes in the txn */
1524  dlist_foreach_modify(iter, &txn->changes)
1525  {
1526  ReorderBufferChange *change;
1527 
1528  change = dlist_container(ReorderBufferChange, node, iter.cur);
1529 
1530  /* Check we're not mixing changes from different transactions. */
1531  Assert(change->txn == txn);
1532 
1533  ReorderBufferReturnChange(rb, change, true);
1534  }
1535 
1536  /*
1537  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1538  * They are always stored in the toplevel transaction.
1539  */
1540  dlist_foreach_modify(iter, &txn->tuplecids)
1541  {
1542  ReorderBufferChange *change;
1543 
1544  change = dlist_container(ReorderBufferChange, node, iter.cur);
1545 
1546  /* Check we're not mixing changes from different transactions. */
1547  Assert(change->txn == txn);
1549 
1550  ReorderBufferReturnChange(rb, change, true);
1551  }
1552 
1553  /*
1554  * Cleanup the base snapshot, if set.
1555  */
1556  if (txn->base_snapshot != NULL)
1557  {
1560  }
1561 
1562  /*
1563  * Cleanup the snapshot for the last streamed run.
1564  */
1565  if (txn->snapshot_now != NULL)
1566  {
1567  Assert(rbtxn_is_streamed(txn));
1569  }
1570 
1571  /*
1572  * Remove TXN from its containing lists.
1573  *
1574  * Note: if txn is known as subxact, we are deleting the TXN from its
1575  * parent's list of known subxacts; this leaves the parent's nsubxacts
1576  * count too high, but we don't care. Otherwise, we are deleting the TXN
1577  * from the LSN-ordered list of toplevel TXNs. We remove the TXN from the
1578  * list of catalog modifying transactions as well.
1579  */
1580  dlist_delete(&txn->node);
1581  if (rbtxn_has_catalog_changes(txn))
1583 
1584  /* now remove reference from buffer */
1585  hash_search(rb->by_txn, &txn->xid, HASH_REMOVE, &found);
1586  Assert(found);
1587 
1588  /* remove entries spilled to disk */
1589  if (rbtxn_is_serialized(txn))
1590  ReorderBufferRestoreCleanup(rb, txn);
1591 
1592  /* deallocate */
1593  ReorderBufferReturnTXN(rb, txn);
1594 }
@ HASH_REMOVE
Definition: hsearch.h:115
static void dclist_delete_from(dclist_head *head, dlist_node *node)
Definition: ilist.h:763
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
#define rbtxn_is_serialized(txn)
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:458
Snapshot snapshot_now
dlist_node catchange_node
dlist_node base_snapshot_node

References ReorderBufferChange::action, Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_node, ReorderBuffer::by_txn, ReorderBufferTXN::catchange_node, ReorderBuffer::catchange_txns, ReorderBufferTXN::changes, dlist_mutable_iter::cur, dclist_delete_from(), dlist_container, dlist_delete(), dlist_foreach_modify, HASH_REMOVE, hash_search(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_has_catalog_changes, rbtxn_is_known_subxact, rbtxn_is_serialized, rbtxn_is_streamed, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferFreeSnap(), ReorderBufferRestoreCleanup(), ReorderBufferReturnChange(), ReorderBufferReturnTXN(), SnapBuildSnapDecRefcount(), ReorderBufferTXN::snapshot_now, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAbortOld(), ReorderBufferFinishPrepared(), ReorderBufferForget(), ReorderBufferProcessTXN(), ReorderBufferReplay(), and ReorderBufferStreamCommit().

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2700 of file reorderbuffer.c.

2704 {
2705  ReorderBufferTXN *txn;
2706 
2707  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2708  false);
2709 
2710  /* unknown transaction, nothing to replay */
2711  if (txn == NULL)
2712  return;
2713 
2714  ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2715  origin_id, origin_lsn);
2716 }
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)

References InvalidXLogRecPtr, ReorderBufferReplay(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 1184 of file reorderbuffer.c.

1187 {
1188  ReorderBufferTXN *subtxn;
1189 
1190  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1191  InvalidXLogRecPtr, false);
1192 
1193  /*
1194  * No need to do anything if that subtxn didn't contain any changes
1195  */
1196  if (!subtxn)
1197  return;
1198 
1199  subtxn->final_lsn = commit_lsn;
1200  subtxn->end_lsn = end_lsn;
1201 
1202  /*
1203  * Assign this subxact as a child of the toplevel xact (no-op if already
1204  * done.)
1205  */
1206  ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1207 }
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit(), and DecodePrepare().

◆ ReorderBufferCopySnap()

static Snapshot ReorderBufferCopySnap ( ReorderBuffer rb,
Snapshot  orig_snap,
ReorderBufferTXN txn,
CommandId  cid 
)
static

Definition at line 1796 of file reorderbuffer.c.

1798 {
1799  Snapshot snap;
1800  dlist_iter iter;
1801  int i = 0;
1802  Size size;
1803 
1804  size = sizeof(SnapshotData) +
1805  sizeof(TransactionId) * orig_snap->xcnt +
1806  sizeof(TransactionId) * (txn->nsubtxns + 1);
1807 
1808  snap = MemoryContextAllocZero(rb->context, size);
1809  memcpy(snap, orig_snap, sizeof(SnapshotData));
1810 
1811  snap->copied = true;
1812  snap->active_count = 1; /* mark as active so nobody frees it */
1813  snap->regd_count = 0;
1814  snap->xip = (TransactionId *) (snap + 1);
1815 
1816  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1817 
1818  /*
1819  * snap->subxip contains all txids that belong to our transaction which we
1820  * need to check via cmin/cmax. That's why we store the toplevel
1821  * transaction in there as well.
1822  */
1823  snap->subxip = snap->xip + snap->xcnt;
1824  snap->subxip[i++] = txn->xid;
1825 
1826  /*
1827  * subxcnt isn't decreased when subtransactions abort, so count manually.
1828  * Since it's an upper boundary it is safe to use it for the allocation
1829  * above.
1830  */
1831  snap->subxcnt = 1;
1832 
1833  dlist_foreach(iter, &txn->subtxns)
1834  {
1835  ReorderBufferTXN *sub_txn;
1836 
1837  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1838  snap->subxip[i++] = sub_txn->xid;
1839  snap->subxcnt++;
1840  }
1841 
1842  /* sort so we can bsearch() later */
1843  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1844 
1845  /* store the specified current CommandId */
1846  snap->curcid = cid;
1847 
1848  return snap;
1849 }
int i
Definition: isn.c:73
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1064
#define qsort(a, b, c, d)
Definition: port.h:445
bool copied
Definition: snapshot.h:185
uint32 regd_count
Definition: snapshot.h:205
uint32 active_count
Definition: snapshot.h:204
CommandId curcid
Definition: snapshot.h:187
TransactionId * subxip
Definition: snapshot.h:180
TransactionId * xip
Definition: snapshot.h:168
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:138

References SnapshotData::active_count, ReorderBuffer::context, SnapshotData::copied, dlist_iter::cur, SnapshotData::curcid, dlist_container, dlist_foreach, i, MemoryContextAllocZero(), ReorderBufferTXN::nsubtxns, qsort, SnapshotData::regd_count, ReorderBufferTXN::subtxns, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::xcnt, ReorderBufferTXN::xid, xidComparator(), and SnapshotData::xip.

Referenced by ReorderBufferProcessTXN(), ReorderBufferSaveTXNSnapshot(), and ReorderBufferStreamTXN().

◆ ReorderBufferExecuteInvalidations()

static void ReorderBufferExecuteInvalidations ( uint32  nmsgs,
SharedInvalidationMessage msgs 
)
static

Definition at line 3328 of file reorderbuffer.c.

3329 {
3330  int i;
3331 
3332  for (i = 0; i < nmsgs; i++)
3334 }
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:615

References i, and LocalExecuteInvalidationMessage().

Referenced by ReorderBufferFinishPrepared(), and ReorderBufferProcessTXN().

◆ ReorderBufferFinishPrepared()

void ReorderBufferFinishPrepared ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
XLogRecPtr  two_phase_at,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn,
char *  gid,
bool  is_commit 
)

Definition at line 2806 of file reorderbuffer.c.

2811 {
2812  ReorderBufferTXN *txn;
2813  XLogRecPtr prepare_end_lsn;
2814  TimestampTz prepare_time;
2815 
2816  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
2817 
2818  /* unknown transaction, nothing to do */
2819  if (txn == NULL)
2820  return;
2821 
2822  /*
2823  * By this time the txn has the prepare record information, remember it to
2824  * be later used for rollback.
2825  */
2826  prepare_end_lsn = txn->end_lsn;
2827  prepare_time = txn->xact_time.prepare_time;
2828 
2829  /* add the gid in the txn */
2830  txn->gid = pstrdup(gid);
2831 
2832  /*
2833  * It is possible that this transaction is not decoded at prepare time
2834  * either because by that time we didn't have a consistent snapshot, or
2835  * two_phase was not enabled, or it was decoded earlier but we have
2836  * restarted. We only need to send the prepare if it was not decoded
2837  * earlier. We don't need to decode the xact for aborts if it is not done
2838  * already.
2839  */
2840  if ((txn->final_lsn < two_phase_at) && is_commit)
2841  {
2842  txn->txn_flags |= RBTXN_PREPARE;
2843 
2844  /*
2845  * The prepare info must have been updated in txn even if we skip
2846  * prepare.
2847  */
2849 
2850  /*
2851  * By this time the txn has the prepare record information and it is
2852  * important to use that so that downstream gets the accurate
2853  * information. If instead, we have passed commit information here
2854  * then downstream can behave as it has already replayed commit
2855  * prepared after the restart.
2856  */
2857  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2858  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2859  }
2860 
2861  txn->final_lsn = commit_lsn;
2862  txn->end_lsn = end_lsn;
2863  txn->xact_time.commit_time = commit_time;
2864  txn->origin_id = origin_id;
2865  txn->origin_lsn = origin_lsn;
2866 
2867  if (is_commit)
2868  rb->commit_prepared(rb, txn, commit_lsn);
2869  else
2870  rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
2871 
2872  /* cleanup: make sure there's no cache pollution */
2874  txn->invalidations);
2875  ReorderBufferCleanupTXN(rb, txn);
2876 }
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1644
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
#define RBTXN_PREPARE
TimestampTz commit_time
RepOriginId origin_id
XLogRecPtr origin_lsn
TimestampTz prepare_time
ReorderBufferCommitPreparedCB commit_prepared
ReorderBufferRollbackPreparedCB rollback_prepared

References Assert(), ReorderBuffer::commit_prepared, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, pstrdup(), RBTXN_PREPARE, ReorderBufferCleanupTXN(), ReorderBufferExecuteInvalidations(), ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBuffer::rollback_prepared, ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodeAbort(), and DecodeCommit().

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2983 of file reorderbuffer.c.

2984 {
2985  ReorderBufferTXN *txn;
2986 
2987  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2988  false);
2989 
2990  /* unknown, nothing to forget */
2991  if (txn == NULL)
2992  return;
2993 
2994  /* this transaction mustn't be streamed */
2995  Assert(!rbtxn_is_streamed(txn));
2996 
2997  /* cosmetic... */
2998  txn->final_lsn = lsn;
2999 
3000  /*
3001  * Process cache invalidation messages if there are any. Even if we're not
3002  * interested in the transaction's contents, it could have manipulated the
3003  * catalog and we need to update the caches according to that.
3004  */
3005  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3007  txn->invalidations);
3008  else
3009  Assert(txn->ninvalidations == 0);
3010 
3011  /* remove potential on-disk data, and deallocate */
3012  ReorderBufferCleanupTXN(rb, txn);
3013 }

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer rb)

Definition at line 388 of file reorderbuffer.c.

389 {
390  MemoryContext context = rb->context;
391 
392  /*
393  * We free separately allocated data by entirely scrapping reorderbuffer's
394  * memory context.
395  */
396  MemoryContextDelete(context);
397 
398  /* Free disk space used by unconsumed reorder buffers */
400 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:403

References ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

◆ ReorderBufferFreeSnap()

static void ReorderBufferFreeSnap ( ReorderBuffer rb,
Snapshot  snap 
)
static

Definition at line 1855 of file reorderbuffer.c.

1856 {
1857  if (snap->copied)
1858  pfree(snap);
1859  else
1861 }
void pfree(void *pointer)
Definition: mcxt.c:1456

References SnapshotData::copied, pfree(), and SnapBuildSnapDecRefcount().

Referenced by ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), ReorderBufferReturnChange(), and ReorderBufferStreamTXN().

◆ ReorderBufferGetCatalogChangesXacts()

TransactionId* ReorderBufferGetCatalogChangesXacts ( ReorderBuffer rb)

Definition at line 3378 of file reorderbuffer.c.

3379 {
3380  dlist_iter iter;
3381  TransactionId *xids = NULL;
3382  size_t xcnt = 0;
3383 
3384  /* Quick return if the list is empty */
3385  if (dclist_count(&rb->catchange_txns) == 0)
3386  return NULL;
3387 
3388  /* Initialize XID array */
3389  xids = (TransactionId *) palloc(sizeof(TransactionId) *
3391  dclist_foreach(iter, &rb->catchange_txns)
3392  {
3394  catchange_node,
3395  iter.cur);
3396 
3398 
3399  xids[xcnt++] = txn->xid;
3400  }
3401 
3402  qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
3403 
3404  Assert(xcnt == dclist_count(&rb->catchange_txns));
3405  return xids;
3406 }
#define dclist_container(type, membername, ptr)
Definition: ilist.h:947
static uint32 dclist_count(const dclist_head *head)
Definition: ilist.h:932
#define dclist_foreach(iter, lhead)
Definition: ilist.h:970

References Assert(), ReorderBuffer::catchange_txns, dlist_iter::cur, dclist_container, dclist_count(), dclist_foreach, palloc(), qsort, rbtxn_has_catalog_changes, ReorderBufferTXN::xid, and xidComparator().

Referenced by SnapBuildSerialize().

◆ ReorderBufferGetChange()

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer rb)

Definition at line 1009 of file reorderbuffer.c.

1010 {
1011  ReorderBufferTXN *txn;
1012 
1013  AssertTXNLsnOrder(rb);
1014 
1015  if (dlist_is_empty(&rb->toplevel_by_lsn))
1016  return NULL;
1017 
1019 
1022  return txn;
1023 }
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:603

References Assert(), AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, and ReorderBuffer::toplevel_by_lsn.

Referenced by SnapBuildProcessRunningXacts().

◆ ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBuffer rb)

Definition at line 1037 of file reorderbuffer.c.

1038 {
1039  ReorderBufferTXN *txn;
1040 
1041  AssertTXNLsnOrder(rb);
1042 
1044  return InvalidTransactionId;
1045 
1046  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
1048  return txn->base_snapshot->xmin;
1049 }
TransactionId xmin
Definition: snapshot.h:157

References AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, dlist_head_element, dlist_is_empty(), InvalidTransactionId, ReorderBuffer::txns_by_base_snapshot_lsn, and SnapshotData::xmin.

Referenced by SnapBuildProcessRunningXacts().

◆ ReorderBufferGetRelids()

Oid* ReorderBufferGetRelids ( ReorderBuffer rb,
int  nrelids 
)

Definition at line 590 of file reorderbuffer.c.

591 {
592  Oid *relids;
593  Size alloc_len;
594 
595  alloc_len = sizeof(Oid) * nrelids;
596 
597  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
598 
599  return relids;
600 }

References ReorderBuffer::context, and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

◆ ReorderBufferGetTupleBuf()

ReorderBufferTupleBuf* ReorderBufferGetTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)

Definition at line 554 of file reorderbuffer.c.

555 {
556  ReorderBufferTupleBuf *tuple;
557  Size alloc_len;
558 
559  alloc_len = tuple_len + SizeofHeapTupleHeader;
560 
561  tuple = (ReorderBufferTupleBuf *)
563  sizeof(ReorderBufferTupleBuf) +
564  MAXIMUM_ALIGNOF + alloc_len);
565  tuple->alloc_tuple_size = alloc_len;
566  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
567 
568  return tuple;
569 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:185
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:47
HeapTupleHeader t_data
Definition: htup.h:68

References ReorderBufferTupleBuf::alloc_tuple_size, MemoryContextAlloc(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, HeapTupleData::t_data, ReorderBuffer::tup_context, and ReorderBufferTupleBuf::tuple.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

◆ ReorderBufferGetTXN()

static ReorderBufferTXN * ReorderBufferGetTXN ( ReorderBuffer rb)
static

Definition at line 406 of file reorderbuffer.c.

407 {
408  ReorderBufferTXN *txn;
409 
410  txn = (ReorderBufferTXN *)
412 
413  memset(txn, 0, sizeof(ReorderBufferTXN));
414 
415  dlist_init(&txn->changes);
416  dlist_init(&txn->tuplecids);
417  dlist_init(&txn->subtxns);
418 
419  /* InvalidCommandId is not zero, so set it explicitly */
421  txn->output_plugin_private = NULL;
422 
423  return txn;
424 }
CommandId command_id
void * output_plugin_private

References ReorderBufferTXN::changes, ReorderBufferTXN::command_id, dlist_init(), InvalidCommandId, MemoryContextAlloc(), ReorderBufferTXN::output_plugin_private, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, and ReorderBuffer::txn_context.

Referenced by ReorderBufferTXNByXid().

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 3055 of file reorderbuffer.c.

3057 {
3058  bool use_subtxn = IsTransactionOrTransactionBlock();
3059  int i;
3060 
3061  if (use_subtxn)
3062  BeginInternalSubTransaction("replay");
3063 
3064  /*
3065  * Force invalidations to happen outside of a valid transaction - that way
3066  * entries will just be marked as invalid without accessing the catalog.
3067  * That's advantageous because we don't need to setup the full state
3068  * necessary for catalog access.
3069  */
3070  if (use_subtxn)
3072 
3073  for (i = 0; i < ninvalidations; i++)
3074  LocalExecuteInvalidationMessage(&invalidations[i]);
3075 
3076  if (use_subtxn)
3078 }
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4834
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4540
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4645
void AbortCurrentTransaction(void)
Definition: xact.c:3305

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by ReorderBufferAbort(), ReorderBufferForget(), ReorderBufferInvalidate(), and xact_decode().

◆ ReorderBufferInvalidate()

void ReorderBufferInvalidate ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3024 of file reorderbuffer.c.

3025 {
3026  ReorderBufferTXN *txn;
3027 
3028  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3029  false);
3030 
3031  /* unknown, nothing to do */
3032  if (txn == NULL)
3033  return;
3034 
3035  /*
3036  * Process cache invalidation messages if there are any. Even if we're not
3037  * interested in the transaction's contents, it could have manipulated the
3038  * catalog and we need to update the caches according to that.
3039  */
3040  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3042  txn->invalidations);
3043  else
3044  Assert(txn->ninvalidations == 0);
3045 }

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodePrepare().

◆ ReorderBufferIterCompare()

static int ReorderBufferIterCompare ( Datum  a,
Datum  b,
void *  arg 
)
static

Definition at line 1226 of file reorderbuffer.c.

1227 {
1229  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
1230  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
1231 
1232  if (pos_a < pos_b)
1233  return 1;
1234  else if (pos_a == pos_b)
1235  return 0;
1236  return -1;
1237 }
void * arg
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:202
Definition: regguts.h:323

References a, arg, b, and DatumGetInt32().

Referenced by ReorderBufferIterTXNInit().

◆ ReorderBufferIterTXNFinish()

static void ReorderBufferIterTXNFinish ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1469 of file reorderbuffer.c.

1471 {
1472  int32 off;
1473 
1474  for (off = 0; off < state->nr_txns; off++)
1475  {
1476  if (state->entries[off].file.vfd != -1)
1477  FileClose(state->entries[off].file.vfd);
1478  }
1479 
1480  /* free memory we might have "leaked" in the last *Next call */
1481  if (!dlist_is_empty(&state->old_change))
1482  {
1483  ReorderBufferChange *change;
1484 
1485  change = dlist_container(ReorderBufferChange, node,
1486  dlist_pop_head_node(&state->old_change));
1487  ReorderBufferReturnChange(rb, change, true);
1488  Assert(dlist_is_empty(&state->old_change));
1489  }
1490 
1491  binaryheap_free(state->heap);
1492  pfree(state);
1493 }
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:75
void FileClose(File file)
Definition: fd.c:1930
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:450

References Assert(), binaryheap_free(), dlist_container, dlist_is_empty(), dlist_pop_head_node(), FileClose(), pfree(), and ReorderBufferReturnChange().

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferIterTXNInit()

static void ReorderBufferIterTXNInit ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferIterTXNState *volatile *  iter_state 
)
static

Definition at line 1249 of file reorderbuffer.c.

1251 {
1252  Size nr_txns = 0;
1254  dlist_iter cur_txn_i;
1255  int32 off;
1256 
1257  *iter_state = NULL;
1258 
1259  /* Check ordering of changes in the toplevel transaction. */
1260  AssertChangeLsnOrder(txn);
1261 
1262  /*
1263  * Calculate the size of our heap: one element for every transaction that
1264  * contains changes. (Besides the transactions already in the reorder
1265  * buffer, we count the one we were directly passed.)
1266  */
1267  if (txn->nentries > 0)
1268  nr_txns++;
1269 
1270  dlist_foreach(cur_txn_i, &txn->subtxns)
1271  {
1272  ReorderBufferTXN *cur_txn;
1273 
1274  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1275 
1276  /* Check ordering of changes in this subtransaction. */
1277  AssertChangeLsnOrder(cur_txn);
1278 
1279  if (cur_txn->nentries > 0)
1280  nr_txns++;
1281  }
1282 
1283  /* allocate iteration state */
1286  sizeof(ReorderBufferIterTXNState) +
1287  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1288 
1289  state->nr_txns = nr_txns;
1290  dlist_init(&state->old_change);
1291 
1292  for (off = 0; off < state->nr_txns; off++)
1293  {
1294  state->entries[off].file.vfd = -1;
1295  state->entries[off].segno = 0;
1296  }
1297 
1298  /* allocate heap */
1299  state->heap = binaryheap_allocate(state->nr_txns,
1301  state);
1302 
1303  /* Now that the state fields are initialized, it is safe to return it. */
1304  *iter_state = state;
1305 
1306  /*
1307  * Now insert items into the binary heap, in an unordered fashion. (We
1308  * will run a heap assembly step at the end; this is more efficient.)
1309  */
1310 
1311  off = 0;
1312 
1313  /* add toplevel transaction if it contains changes */
1314  if (txn->nentries > 0)
1315  {
1316  ReorderBufferChange *cur_change;
1317 
1318  if (rbtxn_is_serialized(txn))
1319  {
1320  /* serialize remaining changes */
1321  ReorderBufferSerializeTXN(rb, txn);
1322  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1323  &state->entries[off].segno);
1324  }
1325 
1326  cur_change = dlist_head_element(ReorderBufferChange, node,
1327  &txn->changes);
1328 
1329  state->entries[off].lsn = cur_change->lsn;
1330  state->entries[off].change = cur_change;
1331  state->entries[off].txn = txn;
1332 
1334  }
1335 
1336  /* add subtransactions if they contain changes */
1337  dlist_foreach(cur_txn_i, &txn->subtxns)
1338  {
1339  ReorderBufferTXN *cur_txn;
1340 
1341  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1342 
1343  if (cur_txn->nentries > 0)
1344  {
1345  ReorderBufferChange *cur_change;
1346 
1347  if (rbtxn_is_serialized(cur_txn))
1348  {
1349  /* serialize remaining changes */
1350  ReorderBufferSerializeTXN(rb, cur_txn);
1351  ReorderBufferRestoreChanges(rb, cur_txn,
1352  &state->entries[off].file,
1353  &state->entries[off].segno);
1354  }
1355  cur_change = dlist_head_element(ReorderBufferChange, node,
1356  &cur_txn->changes);
1357 
1358  state->entries[off].lsn = cur_change->lsn;
1359  state->entries[off].change = cur_change;
1360  state->entries[off].txn = cur_txn;
1361 
1363  }
1364  }
1365 
1366  /* assemble a valid binary heap */
1367  binaryheap_build(state->heap);
1368 }
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:138
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:39
void binaryheap_add_unordered(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:116
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
static void AssertChangeLsnOrder(ReorderBufferTXN *txn)
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)

References AssertChangeLsnOrder(), binaryheap_add_unordered(), binaryheap_allocate(), binaryheap_build(), ReorderBufferTXN::changes, ReorderBuffer::context, dlist_iter::cur, dlist_container, dlist_foreach, dlist_head_element, dlist_init(), Int32GetDatum(), ReorderBufferChange::lsn, MemoryContextAllocZero(), ReorderBufferTXN::nentries, rbtxn_is_serialized, ReorderBufferIterCompare(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), and ReorderBufferTXN::subtxns.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferIterTXNNext()

static ReorderBufferChange * ReorderBufferIterTXNNext ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1377 of file reorderbuffer.c.

1378 {
1379  ReorderBufferChange *change;
1381  int32 off;
1382 
1383  /* nothing there anymore */
1384  if (state->heap->bh_size == 0)
1385  return NULL;
1386 
1387  off = DatumGetInt32(binaryheap_first(state->heap));
1388  entry = &state->entries[off];
1389 
1390  /* free memory we might have "leaked" in the previous *Next call */
1391  if (!dlist_is_empty(&state->old_change))
1392  {
1393  change = dlist_container(ReorderBufferChange, node,
1394  dlist_pop_head_node(&state->old_change));
1395  ReorderBufferReturnChange(rb, change, true);
1396  Assert(dlist_is_empty(&state->old_change));
1397  }
1398 
1399  change = entry->change;
1400 
1401  /*
1402  * update heap with information about which transaction has the next
1403  * relevant change in LSN order
1404  */
1405 
1406  /* there are in-memory changes */
1407  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1408  {
1409  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1410  ReorderBufferChange *next_change =
1412 
1413  /* txn stays the same */
1414  state->entries[off].lsn = next_change->lsn;
1415  state->entries[off].change = next_change;
1416 
1418  return change;
1419  }
1420 
1421  /* try to load changes from disk */
1422  if (entry->txn->nentries != entry->txn->nentries_mem)
1423  {
1424  /*
1425  * Ugly: restoring changes will reuse *Change records, thus delete the
1426  * current one from the per-tx list and only free in the next call.
1427  */
1428  dlist_delete(&change->node);
1429  dlist_push_tail(&state->old_change, &change->node);
1430 
1431  /*
1432  * Update the total bytes processed by the txn for which we are
1433  * releasing the current set of changes and restoring the new set of
1434  * changes.
1435  */
1436  rb->totalBytes += entry->txn->size;
1437  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1438  &state->entries[off].segno))
1439  {
1440  /* successfully restored changes from disk */
1441  ReorderBufferChange *next_change =
1443  &entry->txn->changes);
1444 
1445  elog(DEBUG2, "restored %u/%u changes from disk",
1446  (uint32) entry->txn->nentries_mem,
1447  (uint32) entry->txn->nentries);
1448 
1449  Assert(entry->txn->nentries_mem);
1450  /* txn stays the same */
1451  state->entries[off].lsn = next_change->lsn;
1452  state->entries[off].change = next_change;
1454 
1455  return change;
1456  }
1457  }
1458 
1459  /* ok, no changes there anymore, remove */
1461 
1462  return change;
1463 }
void binaryheap_replace_first(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:255
bh_node_type binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:177
bh_node_type binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:192
static int32 next
Definition: blutils.c:219
unsigned int uint32
Definition: c.h:495
static bool dlist_has_next(const dlist_head *head, const dlist_node *node)
Definition: ilist.h:503
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:537
ReorderBufferChange * change
ReorderBufferTXN * txn

References Assert(), binaryheap_first(), binaryheap_remove_first(), binaryheap_replace_first(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, DatumGetInt32(), DEBUG2, dlist_container, dlist_delete(), dlist_has_next(), dlist_head_element, dlist_is_empty(), dlist_next_node(), dlist_pop_head_node(), dlist_push_tail(), elog(), ReorderBufferIterTXNEntry::file, Int32GetDatum(), ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, next, ReorderBufferChange::node, ReorderBufferRestoreChanges(), ReorderBufferReturnChange(), ReorderBufferTXN::size, ReorderBuffer::totalBytes, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferLargestStreamableTopTXN()

static ReorderBufferTXN* ReorderBufferLargestStreamableTopTXN ( ReorderBuffer rb)
static

Definition at line 3534 of file reorderbuffer.c.

3535 {
3536  dlist_iter iter;
3537  Size largest_size = 0;
3538  ReorderBufferTXN *largest = NULL;
3539 
3540  /* Find the largest top-level transaction having a base snapshot. */
3542  {
3543  ReorderBufferTXN *txn;
3544 
3545  txn = dlist_container(ReorderBufferTXN, base_snapshot_node, iter.cur);
3546 
3547  /* must not be a subtxn */
3549  /* base_snapshot must be set */
3550  Assert(txn->base_snapshot != NULL);
3551 
3552  if ((largest == NULL || txn->total_size > largest_size) &&
3553  (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) &&
3555  {
3556  largest = txn;
3557  largest_size = txn->total_size;
3558  }
3559  }
3560 
3561  return largest;
3562 }
#define rbtxn_has_streamable_change(txn)
#define rbtxn_has_partial_change(txn)

References Assert(), ReorderBufferTXN::base_snapshot, dlist_iter::cur, dlist_container, dlist_foreach, rbtxn_has_partial_change, rbtxn_has_streamable_change, rbtxn_is_known_subxact, ReorderBufferTXN::total_size, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferCheckMemoryLimit().

◆ ReorderBufferLargestTXN()

static ReorderBufferTXN* ReorderBufferLargestTXN ( ReorderBuffer rb)
static

Definition at line 3486 of file reorderbuffer.c.

3487 {
3488  HASH_SEQ_STATUS hash_seq;
3490  ReorderBufferTXN *largest = NULL;
3491 
3492  hash_seq_init(&hash_seq, rb->by_txn);
3493  while ((ent = hash_seq_search(&hash_seq)) != NULL)
3494  {
3495  ReorderBufferTXN *txn = ent->txn;
3496 
3497  /* if the current transaction is larger, remember it */
3498  if ((!largest) || (txn->size > largest->size))
3499  largest = txn;
3500  }
3501 
3502  Assert(largest);
3503  Assert(largest->size > 0);
3504  Assert(largest->size <= rb->size);
3505 
3506  return largest;
3507 }
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1431
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1421
ReorderBufferTXN * txn

References Assert(), ReorderBuffer::by_txn, hash_seq_init(), hash_seq_search(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferCheckMemoryLimit().

◆ ReorderBufferPrepare()

void ReorderBufferPrepare ( ReorderBuffer rb,
TransactionId  xid,
char *  gid 
)

Definition at line 2769 of file reorderbuffer.c.

2771 {
2772  ReorderBufferTXN *txn;
2773 
2774  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2775  false);
2776 
2777  /* unknown transaction, nothing to replay */
2778  if (txn == NULL)
2779  return;
2780 
2781  txn->txn_flags |= RBTXN_PREPARE;
2782  txn->gid = pstrdup(gid);
2783 
2784  /* The prepare info must have been updated in txn by now. */
2786 
2787  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2788  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2789 
2790  /*
2791  * We send the prepare for the concurrently aborted xacts so that later
2792  * when rollback prepared is decoded and sent, the downstream should be
2793  * able to rollback such a xact. See comments atop DecodePrepare.
2794  *
2795  * Note, for the concurrent_abort + streaming case a stream_prepare was
2796  * already sent within the ReorderBufferReplay call above.
2797  */
2798  if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
2799  rb->prepare(rb, txn, txn->final_lsn);
2800 }
ReorderBufferPrepareCB prepare

References Assert(), ReorderBufferTXN::concurrent_abort, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBuffer::prepare, ReorderBufferTXN::prepare_time, pstrdup(), rbtxn_is_streamed, RBTXN_PREPARE, ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodePrepare().

◆ ReorderBufferProcessPartialChange()

static void ReorderBufferProcessPartialChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  toast_insert 
)
static

Definition at line 706 of file reorderbuffer.c.

709 {
710  ReorderBufferTXN *toptxn;
711 
712  /*
713  * The partial changes need to be processed only while streaming
714  * in-progress transactions.
715  */
716  if (!ReorderBufferCanStream(rb))
717  return;
718 
719  /* Get the top transaction. */
720  toptxn = rbtxn_get_toptxn(txn);
721 
722  /*
723  * Indicate a partial change for toast inserts. The change will be
724  * considered as complete once we get the insert or update on the main
725  * table and we are sure that the pending toast chunks are not required
726  * anymore.
727  *
728  * If we allow streaming when there are pending toast chunks then such
729  * chunks won't be released till the insert (multi_insert) is complete and
730  * we expect the txn to have streamed all changes after streaming. This
731  * restriction is mainly to ensure the correctness of streamed
732  * transactions and it doesn't seem worth uplifting such a restriction
733  * just to allow this case because anyway we will stream the transaction
734  * once such an insert is complete.
735  */
736  if (toast_insert)
738  else if (rbtxn_has_partial_change(toptxn) &&
739  IsInsertOrUpdate(change->action) &&
740  change->data.tp.clear_toast_afterwards)
742 
743  /*
744  * Indicate a partial change for speculative inserts. The change will be
745  * considered as complete once we get the speculative confirm or abort
746  * token.
747  */
748  if (IsSpecInsert(change->action))
750  else if (rbtxn_has_partial_change(toptxn) &&
751  IsSpecConfirmOrAbort(change->action))
753 
754  /*
755  * Stream the transaction if it is serialized before and the changes are
756  * now complete in the top-level transaction.
757  *
758  * The reason for doing the streaming of such a transaction as soon as we
759  * get the complete change for it is that previously it would have reached
760  * the memory threshold and wouldn't get streamed because of incomplete
761  * changes. Delaying such transactions would increase apply lag for them.
762  */
764  !(rbtxn_has_partial_change(toptxn)) &&
765  rbtxn_is_serialized(txn) &&
767  ReorderBufferStreamTXN(rb, toptxn);
768 }
#define IsSpecInsert(action)
#define IsInsertOrUpdate(action)
#define IsSpecConfirmOrAbort(action)
#define RBTXN_HAS_PARTIAL_CHANGE

References ReorderBufferChange::action, ReorderBufferChange::data, IsInsertOrUpdate, IsSpecConfirmOrAbort, IsSpecInsert, rbtxn_get_toptxn, RBTXN_HAS_PARTIAL_CHANGE, rbtxn_has_partial_change, rbtxn_has_streamable_change, rbtxn_is_serialized, ReorderBufferCanStartStreaming(), ReorderBufferCanStream(), ReorderBufferStreamTXN(), ReorderBufferChange::tp, and ReorderBufferTXN::txn_flags.

Referenced by ReorderBufferQueueChange().

◆ ReorderBufferProcessTXN()

static void ReorderBufferProcessTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn,
volatile Snapshot  snapshot_now,
volatile CommandId  command_id,
bool  streaming 
)
static

Definition at line 2069 of file reorderbuffer.c.

2074 {
2075  bool using_subtxn;
2077  ReorderBufferIterTXNState *volatile iterstate = NULL;
2078  volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
2079  ReorderBufferChange *volatile specinsert = NULL;
2080  volatile bool stream_started = false;
2081  ReorderBufferTXN *volatile curtxn = NULL;
2082 
2083  /* build data to be able to lookup the CommandIds of catalog tuples */
2085 
2086  /* setup the initial snapshot */
2087  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2088 
2089  /*
2090  * Decoding needs access to syscaches et al., which in turn use
2091  * heavyweight locks and such. Thus we need to have enough state around to
2092  * keep track of those. The easiest way is to simply use a transaction
2093  * internally. That also allows us to easily enforce that nothing writes
2094  * to the database by checking for xid assignments.
2095  *
2096  * When we're called via the SQL SRF there's already a transaction
2097  * started, so start an explicit subtransaction there.
2098  */
2099  using_subtxn = IsTransactionOrTransactionBlock();
2100 
2101  PG_TRY();
2102  {
2103  ReorderBufferChange *change;
2104  int changes_count = 0; /* used to accumulate the number of
2105  * changes */
2106 
2107  if (using_subtxn)
2108  BeginInternalSubTransaction(streaming ? "stream" : "replay");
2109  else
2111 
2112  /*
2113  * We only need to send begin/begin-prepare for non-streamed
2114  * transactions.
2115  */
2116  if (!streaming)
2117  {
2118  if (rbtxn_prepared(txn))
2119  rb->begin_prepare(rb, txn);
2120  else
2121  rb->begin(rb, txn);
2122  }
2123 
2124  ReorderBufferIterTXNInit(rb, txn, &iterstate);
2125  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
2126  {
2127  Relation relation = NULL;
2128  Oid reloid;
2129 
2131 
2132  /*
2133  * We can't call start stream callback before processing first
2134  * change.
2135  */
2136  if (prev_lsn == InvalidXLogRecPtr)
2137  {
2138  if (streaming)
2139  {
2140  txn->origin_id = change->origin_id;
2141  rb->stream_start(rb, txn, change->lsn);
2142  stream_started = true;
2143  }
2144  }
2145 
2146  /*
2147  * Enforce correct ordering of changes, merged from multiple
2148  * subtransactions. The changes may have the same LSN due to
2149  * MULTI_INSERT xlog records.
2150  */
2151  Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
2152 
2153  prev_lsn = change->lsn;
2154 
2155  /*
2156  * Set the current xid to detect concurrent aborts. This is
2157  * required for the cases when we decode the changes before the
2158  * COMMIT record is processed.
2159  */
2160  if (streaming || rbtxn_prepared(change->txn))
2161  {
2162  curtxn = change->txn;
2163  SetupCheckXidLive(curtxn->xid);
2164  }
2165 
2166  switch (change->action)
2167  {
2169 
2170  /*
2171  * Confirmation for speculative insertion arrived. Simply
2172  * use as a normal record. It'll be cleaned up at the end
2173  * of INSERT processing.
2174  */
2175  if (specinsert == NULL)
2176  elog(ERROR, "invalid ordering of speculative insertion changes");
2177  Assert(specinsert->data.tp.oldtuple == NULL);
2178  change = specinsert;
2180 
2181  /* intentionally fall through */
2185  Assert(snapshot_now);
2186 
2187  reloid = RelidByRelfilenumber(change->data.tp.rlocator.spcOid,
2188  change->data.tp.rlocator.relNumber);
2189 
2190  /*
2191  * Mapped catalog tuple without data, emitted while
2192  * catalog table was in the process of being rewritten. We
2193  * can fail to look up the relfilenumber, because the
2194  * relmapper has no "historic" view, in contrast to the
2195  * normal catalog during decoding. Thus repeated rewrites
2196  * can cause a lookup failure. That's OK because we do not
2197  * decode catalog changes anyway. Normally such tuples
2198  * would be skipped over below, but we can't identify
2199  * whether the table should be logically logged without
2200  * mapping the relfilenumber to the oid.
2201  */
2202  if (reloid == InvalidOid &&
2203  change->data.tp.newtuple == NULL &&
2204  change->data.tp.oldtuple == NULL)
2205  goto change_done;
2206  else if (reloid == InvalidOid)
2207  elog(ERROR, "could not map filenumber \"%s\" to relation OID",
2208  relpathperm(change->data.tp.rlocator,
2209  MAIN_FORKNUM));
2210 
2211  relation = RelationIdGetRelation(reloid);
2212 
2213  if (!RelationIsValid(relation))
2214  elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
2215  reloid,
2216  relpathperm(change->data.tp.rlocator,
2217  MAIN_FORKNUM));
2218 
2219  if (!RelationIsLogicallyLogged(relation))
2220  goto change_done;
2221 
2222  /*
2223  * Ignore temporary heaps created during DDL unless the
2224  * plugin has asked for them.
2225  */
2226  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2227  goto change_done;
2228 
2229  /*
2230  * For now ignore sequence changes entirely. Most of the
2231  * time they don't log changes using records we
2232  * understand, so it doesn't make sense to handle the few
2233  * cases we do.
2234  */
2235  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2236  goto change_done;
2237 
2238  /* user-triggered change */
2239  if (!IsToastRelation(relation))
2240  {
2241  ReorderBufferToastReplace(rb, txn, relation, change);
2242  ReorderBufferApplyChange(rb, txn, relation, change,
2243  streaming);
2244 
2245  /*
2246  * Only clear reassembled toast chunks if we're sure
2247  * they're not required anymore. The creator of the
2248  * tuple tells us.
2249  */
2250  if (change->data.tp.clear_toast_afterwards)
2251  ReorderBufferToastReset(rb, txn);
2252  }
2253  /* we're not interested in toast deletions */
2254  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2255  {
2256  /*
2257  * Need to reassemble the full toasted Datum in
2258  * memory, to ensure the chunks don't get reused till
2259  * we're done remove it from the list of this
2260  * transaction's changes. Otherwise it will get
2261  * freed/reused while restoring spooled data from
2262  * disk.
2263  */
2264  Assert(change->data.tp.newtuple != NULL);
2265 
2266  dlist_delete(&change->node);
2267  ReorderBufferToastAppendChunk(rb, txn, relation,
2268  change);
2269  }
2270 
2271  change_done:
2272 
2273  /*
2274  * If speculative insertion was confirmed, the record
2275  * isn't needed anymore.
2276  */
2277  if (specinsert != NULL)
2278  {
2279  ReorderBufferReturnChange(rb, specinsert, true);
2280  specinsert = NULL;
2281  }
2282 
2283  if (RelationIsValid(relation))
2284  {
2285  RelationClose(relation);
2286  relation = NULL;
2287  }
2288  break;
2289 
2291 
2292  /*
2293  * Speculative insertions are dealt with by delaying the
2294  * processing of the insert until the confirmation record
2295  * arrives. For that we simply unlink the record from the
2296  * chain, so it does not get freed/reused while restoring
2297  * spooled data from disk.
2298  *
2299  * This is safe in the face of concurrent catalog changes
2300  * because the relevant relation can't be changed between
2301  * speculative insertion and confirmation due to
2302  * CheckTableNotInUse() and locking.
2303  */
2304 
2305  /* clear out a pending (and thus failed) speculation */
2306  if (specinsert != NULL)
2307  {
2308  ReorderBufferReturnChange(rb, specinsert, true);
2309  specinsert = NULL;
2310  }
2311 
2312  /* and memorize the pending insertion */
2313  dlist_delete(&change->node);
2314  specinsert = change;
2315  break;
2316 
2318 
2319  /*
2320  * Abort for speculative insertion arrived. So cleanup the
2321  * specinsert tuple and toast hash.
2322  *
2323  * Note that we get the spec abort change for each toast
2324  * entry but we need to perform the cleanup only the first
2325  * time we get it for the main table.
2326  */
2327  if (specinsert != NULL)
2328  {
2329  /*
2330  * We must clean the toast hash before processing a
2331  * completely new tuple to avoid confusion about the
2332  * previous tuple's toast chunks.
2333  */
2334  Assert(change->data.tp.clear_toast_afterwards);
2335  ReorderBufferToastReset(rb, txn);
2336 
2337  /* We don't need this record anymore. */
2338  ReorderBufferReturnChange(rb, specinsert, true);
2339  specinsert = NULL;
2340  }
2341  break;
2342 
2344  {
2345  int i;
2346  int nrelids = change->data.truncate.nrelids;
2347  int nrelations = 0;
2348  Relation *relations;
2349 
2350  relations = palloc0(nrelids * sizeof(Relation));
2351  for (i = 0; i < nrelids; i++)
2352  {
2353  Oid relid = change->data.truncate.relids[i];
2354  Relation rel;
2355 
2356  rel = RelationIdGetRelation(relid);
2357 
2358  if (!RelationIsValid(rel))
2359  elog(ERROR, "could not open relation with OID %u", relid);
2360 
2361  if (!RelationIsLogicallyLogged(rel))
2362  continue;
2363 
2364  relations[nrelations++] = rel;
2365  }
2366 
2367  /* Apply the truncate. */
2368  ReorderBufferApplyTruncate(rb, txn, nrelations,
2369  relations, change,
2370  streaming);
2371 
2372  for (i = 0; i < nrelations; i++)
2373  RelationClose(relations[i]);
2374 
2375  break;
2376  }
2377 
2379  ReorderBufferApplyMessage(rb, txn, change, streaming);
2380  break;
2381 
2383  /* Execute the invalidation messages locally */
2384  ReorderBufferExecuteInvalidations(change->data.inval.ninvalidations,
2385  change->data.inval.invalidations);
2386  break;
2387 
2389  /* get rid of the old */
2390  TeardownHistoricSnapshot(false);
2391 
2392  if (snapshot_now->copied)
2393  {
2394  ReorderBufferFreeSnap(rb, snapshot_now);
2395  snapshot_now =
2396  ReorderBufferCopySnap(rb, change->data.snapshot,
2397  txn, command_id);
2398  }
2399 
2400  /*
2401  * Restored from disk, need to be careful not to double
2402  * free. We could introduce refcounting for that, but for
2403  * now this seems infrequent enough not to care.
2404  */
2405  else if (change->data.snapshot->copied)
2406  {
2407  snapshot_now =
2408  ReorderBufferCopySnap(rb, change->data.snapshot,
2409  txn, command_id);
2410  }
2411  else
2412  {
2413  snapshot_now = change->data.snapshot;
2414  }
2415 
2416  /* and continue with the new one */
2417  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2418  break;
2419 
2421  Assert(change->data.command_id != InvalidCommandId);
2422 
2423  if (command_id < change->data.command_id)
2424  {
2425  command_id = change->data.command_id;
2426 
2427  if (!snapshot_now->copied)
2428  {
2429  /* we don't use the global one anymore */
2430  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2431  txn, command_id);
2432  }
2433 
2434  snapshot_now->curcid = command_id;
2435 
2436  TeardownHistoricSnapshot(false);
2437  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2438  }
2439 
2440  break;
2441 
2443  elog(ERROR, "tuplecid value in changequeue");
2444  break;
2445  }
2446 
2447  /*
2448  * It is possible that the data is not sent to downstream for a
2449  * long time either because the output plugin filtered it or there
2450  * is a DDL that generates a lot of data that is not processed by
2451  * the plugin. So, in such cases, the downstream can timeout. To
2452  * avoid that we try to send a keepalive message if required.
2453  * Trying to send a keepalive message after every change has some
2454  * overhead, but testing showed there is no noticeable overhead if
2455  * we do it after every ~100 changes.
2456  */
2457 #define CHANGES_THRESHOLD 100
2458 
2459  if (++changes_count >= CHANGES_THRESHOLD)
2460  {
2461  rb->update_progress_txn(rb, txn, change->lsn);
2462  changes_count = 0;
2463  }
2464  }
2465 
2466  /* speculative insertion record must be freed by now */
2467  Assert(!specinsert);
2468 
2469  /* clean up the iterator */
2470  ReorderBufferIterTXNFinish(rb, iterstate);
2471  iterstate = NULL;
2472 
2473  /*
2474  * Update total transaction count and total bytes processed by the
2475  * transaction and its subtransactions. Ensure to not count the
2476  * streamed transaction multiple times.
2477  *
2478  * Note that the statistics computation has to be done after
2479  * ReorderBufferIterTXNFinish as it releases the serialized change
2480  * which we have already accounted in ReorderBufferIterTXNNext.
2481  */
2482  if (!rbtxn_is_streamed(txn))
2483  rb->totalTxns++;
2484 
2485  rb->totalBytes += txn->total_size;
2486 
2487  /*
2488  * Done with current changes, send the last message for this set of
2489  * changes depending upon streaming mode.
2490  */
2491  if (streaming)
2492  {
2493  if (stream_started)
2494  {
2495  rb->stream_stop(rb, txn, prev_lsn);
2496  stream_started = false;
2497  }
2498  }
2499  else
2500  {
2501  /*
2502  * Call either PREPARE (for two-phase transactions) or COMMIT (for
2503  * regular ones).
2504  */
2505  if (rbtxn_prepared(txn))
2506  rb->prepare(rb, txn, commit_lsn);
2507  else
2508  rb->commit(rb, txn, commit_lsn);
2509  }
2510 
2511  /* this is just a sanity check against bad output plugin behaviour */
2513  elog(ERROR, "output plugin used XID %u",
2515 
2516  /*
2517  * Remember the command ID and snapshot for the next set of changes in
2518  * streaming mode.
2519  */
2520  if (streaming)
2521  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2522  else if (snapshot_now->copied)
2523  ReorderBufferFreeSnap(rb, snapshot_now);
2524 
2525  /* cleanup */
2526  TeardownHistoricSnapshot(false);
2527 
2528  /*
2529  * Aborting the current (sub-)transaction as a whole has the right
2530  * semantics. We want all locks acquired in here to be released, not
2531  * reassigned to the parent and we do not want any database access
2532  * have persistent effects.
2533  */
2535 
2536  /* make sure there's no cache pollution */
2538 
2539  if (using_subtxn)
2541 
2542  /*
2543  * We are here due to one of the four reasons: 1. Decoding an
2544  * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
2545  * prepared txn that was (partially) streamed. 4. Decoding a committed
2546  * txn.
2547  *
2548  * For 1, we allow truncation of txn data by removing the changes
2549  * already streamed but still keeping other things like invalidations,
2550  * snapshot, and tuplecids. For 2 and 3, we indicate
2551  * ReorderBufferTruncateTXN to do more elaborate truncation of txn
2552  * data as the entire transaction has been decoded except for commit.
2553  * For 4, as the entire txn has been decoded, we can fully clean up
2554  * the TXN reorder buffer.
2555  */
2556  if (streaming || rbtxn_prepared(txn))
2557  {
2559  /* Reset the CheckXidAlive */
2561  }
2562  else
2563  ReorderBufferCleanupTXN(rb, txn);
2564  }
2565  PG_CATCH();
2566  {
2567  MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
2568  ErrorData *errdata = CopyErrorData();
2569 
2570  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
2571  if (iterstate)
2572  ReorderBufferIterTXNFinish(rb, iterstate);
2573 
2575 
2576  /*
2577  * Force cache invalidation to happen outside of a valid transaction
2578  * to prevent catalog access as we just caught an error.
2579  */
2581 
2582  /* make sure there's no cache pollution */
2584  txn->invalidations);
2585 
2586  if (using_subtxn)
2588 
2589  /*
2590  * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2591  * abort of the (sub)transaction we are streaming or preparing. We
2592  * need to do the cleanup and return gracefully on this error, see
2593  * SetupCheckXidLive.
2594  *
2595  * This error code can be thrown by one of the callbacks we call
2596  * during decoding so we need to ensure that we return gracefully only
2597  * when we are sending the data in streaming mode and the streaming is
2598  * not finished yet or when we are sending the data out on a PREPARE
2599  * during a two-phase commit.
2600  */
2601  if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2602  (stream_started || rbtxn_prepared(txn)))
2603  {
2604  /* curtxn must be set for streaming or prepared transactions */
2605  Assert(curtxn);
2606 
2607  /* Cleanup the temporary error state. */
2608  FlushErrorState();
2609  FreeErrorData(errdata);
2610  errdata = NULL;
2611  curtxn->concurrent_abort = true;
2612 
2613  /* Reset the TXN so that it is allowed to stream remaining data. */
2614  ReorderBufferResetTXN(rb, txn, snapshot_now,
2615  command_id, prev_lsn,
2616  specinsert);
2617  }
2618  else
2619  {
2620  ReorderBufferCleanupTXN(rb, txn);
2621  MemoryContextSwitchTo(ecxt);
2622  PG_RE_THROW();
2623  }
2624  }
2625  PG_END_TRY();
2626 }
bool IsToastRelation(Relation relation)
Definition: catalog.c:147
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1776
void FlushErrorState(void)
Definition: elog.c:1825
ErrorData * CopyErrorData(void)
Definition: elog.c:1720
#define PG_RE_THROW()
Definition: elog.h:411
#define PG_TRY(...)
Definition: elog.h:370
#define PG_END_TRY(...)
Definition: elog.h:395
#define PG_CATCH(...)
Definition: elog.h:380
void * palloc0(Size size)
Definition: mcxt.c:1257
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
const void * data
#define InvalidOid
Definition: postgres_ext.h:36
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:702
#define RelationIsValid(relation)
Definition: rel.h:477
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2054
void RelationClose(Relation relation)
Definition: relcache.c:2160
Oid RelidByRelfilenumber(Oid reltablespace, RelFileNumber relfilenumber)
@ MAIN_FORKNUM
Definition: relpath.h:50
#define relpathperm(rlocator, forknum)
Definition: relpath.h:90
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
static void ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
#define CHANGES_THRESHOLD
static void ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
static void SetupCheckXidLive(TransactionId xid)
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
static void ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
#define rbtxn_prepared(txn)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:1640
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:1624
int sqlerrcode
Definition: elog.h:438
Form_pg_class rd_rel
Definition: rel.h:111
RepOriginId origin_id
Definition: reorderbuffer.h:99
ReorderBufferBeginCB begin_prepare
ReorderBufferUpdateProgressTxnCB update_progress_txn
ReorderBufferStreamStopCB stream_stop
ReorderBufferCommitCB commit
ReorderBufferStreamStartCB stream_start
ReorderBufferBeginCB begin
TransactionId CheckXidAlive
Definition: xact.c:99
void StartTransactionCommand(void)
Definition: xact.c:2937
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:462
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:445

References AbortCurrentTransaction(), ReorderBufferChange::action, Assert(), ReorderBuffer::begin, ReorderBuffer::begin_prepare, BeginInternalSubTransaction(), CHANGES_THRESHOLD, CHECK_FOR_INTERRUPTS, CheckXidAlive, ReorderBufferChange::command_id, ReorderBuffer::commit, ReorderBufferTXN::concurrent_abort, SnapshotData::copied, CopyErrorData(), SnapshotData::curcid, CurrentMemoryContext, ReorderBufferChange::data, data, dlist_delete(), elog(), ERROR, FlushErrorState(), FreeErrorData(), GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), i, ReorderBufferChange::inval, ReorderBufferTXN::invalidations, InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, ReorderBufferChange::node, ReorderBufferChange::origin_id, ReorderBufferTXN::origin_id, ReorderBuffer::output_rewrites, palloc0(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, ReorderBuffer::prepare, rbtxn_is_streamed, rbtxn_prepared, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelationIsValid, RelidByRelfilenumber(), relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferApplyChange(), ReorderBufferApplyMessage(), ReorderBufferApplyTruncate(), ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferResetTXN(), ReorderBufferReturnChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), RollbackAndReleaseCurrentSubTransaction(), SetupCheckXidLive(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, ErrorData::sqlerrcode, StartTransactionCommand(), ReorderBuffer::stream_start, ReorderBuffer::stream_stop, TeardownHistoricSnapshot(), ReorderBufferTXN::total_size, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTXN::tuplecid_hash, ReorderBufferChange::txn, ReorderBuffer::update_progress_txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferReplay(), and ReorderBufferStreamTXN().

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3091 of file reorderbuffer.c.

3092 {
3093  /* many records won't have an xid assigned, centralize check here */
3094  if (xid != InvalidTransactionId)
3095  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3096 }

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by heap2_decode(), heap_decode(), LogicalDecodingProcessRecord(), logicalmsg_decode(), standby_decode(), xact_decode(), and xlog_decode().

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change,
bool  toast_insert 
)

Definition at line 775 of file reorderbuffer.c.

777 {
778  ReorderBufferTXN *txn;
779 
780  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
781 
782  /*
783  * While streaming the previous changes we have detected that the
784  * transaction is aborted. So there is no point in collecting further
785  * changes for it.
786  */
787  if (txn->concurrent_abort)
788  {
789  /*
790  * We don't need to update memory accounting for this change as we
791  * have not added it to the queue yet.
792  */
793  ReorderBufferReturnChange(rb, change, false);
794  return;
795  }
796 
797  /*
798  * The changes that are sent downstream are considered streamable. We
799  * remember such transactions so that only those will later be considered
800  * for streaming.
801  */
802  if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
808  {
809  ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
810 
812  }
813 
814  change->lsn = lsn;
815  change->txn = txn;
816 
817  Assert(InvalidXLogRecPtr != lsn);
818  dlist_push_tail(&txn->changes, &change->node);
819  txn->nentries++;
820  txn->nentries_mem++;
821 
822  /* update memory accounting information */
823  ReorderBufferChangeMemoryUpdate(rb, change, true,
824  ReorderBufferChangeSize(change));
825 
826  /* process partial change */
827  ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
828 
829  /* check the memory limits and evict something if needed */
831 }
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition, Size sz)
#define RBTXN_HAS_STREAMABLE_CHANGE

References ReorderBufferChange::action, Assert(), ReorderBufferTXN::changes, ReorderBufferTXN::concurrent_abort, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, rbtxn_get_toptxn, RBTXN_HAS_STREAMABLE_CHANGE, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferCheckMemoryLimit(), ReorderBufferProcessPartialChange(), ReorderBufferReturnChange(), ReorderBufferTXNByXid(), ReorderBufferChange::txn, and ReorderBufferTXN::txn_flags.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddInvalidations(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snap,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 838 of file reorderbuffer.c.

842 {
843  if (transactional)
844  {
845  MemoryContext oldcontext;
846  ReorderBufferChange *change;
847 
849 
850  /*
851  * We don't expect snapshots for transactional changes - we'll use the
852  * snapshot derived later during apply (unless the change gets
853  * skipped).
854  */
855  Assert(!snap);
856 
857  oldcontext = MemoryContextSwitchTo(rb->context);
858 
859  change = ReorderBufferGetChange(rb);
861  change->data.msg.prefix = pstrdup(prefix);
862  change->data.msg.message_size = message_size;
863  change->data.msg.message = palloc(message_size);
864  memcpy(change->data.msg.message, message, message_size);
865 
866  ReorderBufferQueueChange(rb, xid, lsn, change, false);
867 
868  MemoryContextSwitchTo(oldcontext);
869  }
870  else
871  {
872  ReorderBufferTXN *txn = NULL;
873  volatile Snapshot snapshot_now = snap;
874 
875  /* Non-transactional changes require a valid snapshot. */
876  Assert(snapshot_now);
877 
878  if (xid != InvalidTransactionId)
879  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
880 
881  /* setup snapshot to allow catalog access */
882  SetupHistoricSnapshot(snapshot_now, NULL);
883  PG_TRY();
884  {
885  rb->message(rb, txn, lsn, false, prefix, message_size, message);
886 
888  }
889  PG_CATCH();
890  {
892  PG_RE_THROW();
893  }
894  PG_END_TRY();
895  }
896 }

References ReorderBufferChange::action, Assert(), ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), and TeardownHistoricSnapshot().

Referenced by logicalmsg_decode().

◆ ReorderBufferRememberPrepareInfo()

bool ReorderBufferRememberPrepareInfo ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  prepare_lsn,
XLogRecPtr  end_lsn,
TimestampTz  prepare_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2722 of file reorderbuffer.c.

2726 {
2727  ReorderBufferTXN *txn;
2728 
2729  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2730 
2731  /* unknown transaction, nothing to do */
2732  if (txn == NULL)
2733  return false;
2734 
2735  /*
2736  * Remember the prepare information to be later used by commit prepared in
2737  * case we skip doing prepare.
2738  */
2739  txn->final_lsn = prepare_lsn;
2740  txn->end_lsn = end_lsn;
2741  txn->xact_time.prepare_time = prepare_time;
2742  txn->origin_id = origin_id;
2743  txn->origin_lsn = origin_lsn;
2744 
2745  return true;
2746 }

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, ReorderBufferTXNByXid(), and ReorderBufferTXN::xact_time.

Referenced by DecodePrepare().

◆ ReorderBufferReplay()

static void ReorderBufferReplay ( ReorderBufferTXN txn,
ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)
static

Definition at line 2639 of file reorderbuffer.c.

2644 {
2645  Snapshot snapshot_now;
2646  CommandId command_id = FirstCommandId;
2647 
2648  txn->final_lsn = commit_lsn;
2649  txn->end_lsn = end_lsn;
2650  txn->xact_time.commit_time = commit_time;
2651  txn->origin_id = origin_id;
2652  txn->origin_lsn = origin_lsn;
2653 
2654  /*
2655  * If the transaction was (partially) streamed, we need to commit it in a
2656  * 'streamed' way. That is, we first stream the remaining part of the
2657  * transaction, and then invoke stream_commit message.
2658  *
2659  * Called after everything (origin ID, LSN, ...) is stored in the
2660  * transaction to avoid passing that information directly.
2661  */
2662  if (rbtxn_is_streamed(txn))
2663  {
2664  ReorderBufferStreamCommit(rb, txn);
2665  return;
2666  }
2667 
2668  /*
2669  * If this transaction has no snapshot, it didn't make any changes to the
2670  * database, so there's nothing to decode. Note that
2671  * ReorderBufferCommitChild will have transferred any snapshots from
2672  * subtransactions if there were any.
2673  */
2674  if (txn->base_snapshot == NULL)
2675  {
2676  Assert(txn->ninvalidations == 0);
2677 
2678  /*
2679  * Removing this txn before a commit might result in the computation
2680  * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
2681  */
2682  if (!rbtxn_prepared(txn))
2683  ReorderBufferCleanupTXN(rb, txn);
2684  return;
2685  }
2686 
2687  snapshot_now = txn->base_snapshot;
2688 
2689  /* Process and send the changes to output plugin. */
2690  ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2691  command_id, false);
2692 }
#define FirstCommandId
Definition: c.h:657
uint32 CommandId
Definition: c.h:655
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, FirstCommandId, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, rbtxn_is_streamed, rbtxn_prepared, ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), ReorderBufferStreamCommit(), and ReorderBufferTXN::xact_time.

Referenced by ReorderBufferCommit(), ReorderBufferFinishPrepared(), and ReorderBufferPrepare().

◆ ReorderBufferResetTXN()

static void ReorderBufferResetTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id,
XLogRecPtr  last_lsn,
ReorderBufferChange specinsert 
)
static

Definition at line 2026 of file reorderbuffer.c.

2031 {
2032  /* Discard the changes that we just streamed */
2034 
2035  /* Free all resources allocated for toast reconstruction */
2036  ReorderBufferToastReset(rb, txn);
2037 
2038  /* Return the spec insert change if it is not NULL */
2039  if (specinsert != NULL)
2040  {
2041  ReorderBufferReturnChange(rb, specinsert, true);
2042  specinsert = NULL;
2043  }
2044 
2045  /*
2046  * For the streaming case, stop the stream and remember the command ID and
2047  * snapshot for the streaming run.
2048  */
2049  if (rbtxn_is_streamed(txn))
2050  {
2051  rb->stream_stop(rb, txn, last_lsn);
2052  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2053  }
2054 }

References rbtxn_is_streamed, rbtxn_prepared, ReorderBufferReturnChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), and ReorderBuffer::stream_stop.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferRestoreChange()

static void ReorderBufferRestoreChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
char *  data 
)
static

Definition at line 4337 of file reorderbuffer.c.

4339 {
4340  ReorderBufferDiskChange *ondisk;
4341  ReorderBufferChange *change;
4342 
4343  ondisk = (ReorderBufferDiskChange *) data;
4344 
4345  change = ReorderBufferGetChange(rb);
4346 
4347  /* copy static part */
4348  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
4349 
4350  data += sizeof(ReorderBufferDiskChange);
4351 
4352  /* restore individual stuff */
4353  switch (change->action)
4354  {
4355  /* fall through these, they're all similar enough */
4360  if (change->data.tp.oldtuple)
4361  {
4362  uint32 tuplelen = ((HeapTuple) data)->t_len;
4363 
4364  change->data.tp.oldtuple =
4366 
4367  /* restore ->tuple */
4368  memcpy(&change->data.tp.oldtuple->tuple, data,
4369  sizeof(HeapTupleData));
4370  data += sizeof(HeapTupleData);
4371 
4372  /* reset t_data pointer into the new tuplebuf */
4373  change->data.tp.oldtuple->tuple.t_data =
4374  ReorderBufferTupleBufData(change->data.tp.oldtuple);
4375 
4376  /* restore tuple data itself */
4377  memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
4378  data += tuplelen;
4379  }
4380 
4381  if (change->data.tp.newtuple)
4382  {
4383  /* here, data might not be suitably aligned! */
4384  uint32 tuplelen;
4385 
4386  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
4387  sizeof(uint32));
4388 
4389  change->data.tp.newtuple =
4391 
4392  /* restore ->tuple */
4393  memcpy(&change->data.tp.newtuple->tuple, data,
4394  sizeof(HeapTupleData));
4395  data += sizeof(HeapTupleData);
4396 
4397  /* reset t_data pointer into the new tuplebuf */
4398  change->data.tp.newtuple->tuple.t_data =
4399  ReorderBufferTupleBufData(change->data.tp.newtuple);
4400 
4401  /* restore tuple data itself */
4402  memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
4403  data += tuplelen;
4404  }
4405 
4406  break;
4408  {
4409  Size prefix_size;
4410 
4411  /* read prefix */
4412  memcpy(&prefix_size, data, sizeof(Size));
4413  data += sizeof(Size);
4414  change->data.msg.prefix = MemoryContextAlloc(rb->context,
4415  prefix_size);
4416  memcpy(change->data.msg.prefix, data, prefix_size);
4417  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
4418  data += prefix_size;
4419 
4420  /* read the message */
4421  memcpy(&change->data.msg.message_size, data, sizeof(Size));
4422  data += sizeof(Size);
4423  change->data.msg.message = MemoryContextAlloc(rb->context,
4424  change->data.msg.message_size);
4425  memcpy(change->data.msg.message, data,
4426  change->data.msg.message_size);
4427  data += change->data.msg.message_size;
4428 
4429  break;
4430  }
4432  {
4433  Size inval_size = sizeof(SharedInvalidationMessage) *
4434  change->data.inval.ninvalidations;
4435 
4436  change->data.inval.invalidations =
4437  MemoryContextAlloc(rb->context, inval_size);
4438 
4439  /* read the message */
4440  memcpy(change->data.inval.invalidations, data, inval_size);
4441 
4442  break;
4443  }
4445  {
4446  Snapshot oldsnap;
4447  Snapshot newsnap;
4448  Size size;
4449 
4450  oldsnap = (Snapshot) data;
4451 
4452  size = sizeof(SnapshotData) +
4453  sizeof(TransactionId) * oldsnap->xcnt +
4454  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
4455 
4456  change->data.snapshot = MemoryContextAllocZero(rb->context, size);
4457 
4458  newsnap = change->data.snapshot;
4459 
4460  memcpy(newsnap, data, size);
4461  newsnap->xip = (TransactionId *)
4462  (((char *) newsnap) + sizeof(SnapshotData));
4463  newsnap->subxip = newsnap->xip + newsnap->xcnt;
4464  newsnap->copied = true;
4465  break;
4466  }
4467  /* the base struct contains all the data, easy peasy */
4469  {
4470  Oid *relids;
4471 
4472  relids = ReorderBufferGetRelids(rb,
4473  change->data.truncate.nrelids);
4474  memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
4475  change->data.truncate.relids = relids;
4476 
4477  break;
4478  }
4483  break;
4484  }
4485 
4486  dlist_push_tail(&txn->changes, &change->node);
4487  txn->nentries_mem++;
4488 
4489  /*
4490  * Update memory accounting for the restored change. We need to do this
4491  * although we don't check the memory limit when restoring the changes in
4492  * this branch (we only do that when initially queueing the changes after
4493  * decoding), because we will release the changes later, and that will
4494  * update the accounting too (subtracting the size from the counters). And
4495  * we don't want to underflow there.
4496  */
4497  ReorderBufferChangeMemoryUpdate(rb, change, true,
4498  ReorderBufferChangeSize(change));
4499 }
HeapTupleData * HeapTuple
Definition: htup.h:71
struct ReorderBufferDiskChange ReorderBufferDiskChange
Oid * ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
ReorderBufferTupleBuf * ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
struct SnapshotData * Snapshot
Definition: snapshot.h:121
ReorderBufferChange change

References ReorderBufferChange::action, Assert(), ReorderBufferDiskChange::change, ReorderBufferTXN::changes, ReorderBuffer::context, SnapshotData::copied, ReorderBufferChange::data, data, dlist_push_tail(), ReorderBufferChange::inval, MemoryContextAlloc(), MemoryContextAllocZero(), ReorderBufferChange::msg, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferGetChange(), ReorderBufferGetRelids(), ReorderBufferGetTupleBuf(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, ReorderBufferChange::tp, ReorderBufferChange::truncate, SnapshotData::xcnt, and SnapshotData::xip.

Referenced by ReorderBufferRestoreChanges().

◆ ReorderBufferRestoreChanges()

static Size ReorderBufferRestoreChanges ( ReorderBuffer rb,
ReorderBufferTXN txn,
TXNEntryFile file,
XLogSegNo segno 
)
static

Definition at line 4194 of file reorderbuffer.c.

4196 {
4197  Size restored = 0;
4198  XLogSegNo last_segno;
4199  dlist_mutable_iter cleanup_iter;
4200  File *fd = &file->vfd;
4201 
4204 
4205  /* free current entries, so we have memory for more */
4206  dlist_foreach_modify(cleanup_iter, &txn->changes)
4207  {
4209  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
4210 
4211  dlist_delete(&cleanup->node);
4213  }
4214  txn->nentries_mem = 0;
4215  Assert(dlist_is_empty(&txn->changes));
4216 
4217  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
4218 
4219  while (restored < max_changes_in_memory && *segno <= last_segno)
4220  {
4221  int readBytes;
4222  ReorderBufferDiskChange *ondisk;
4223 
4225 
4226  if (*fd == -1)
4227  {
4228  char path[MAXPGPATH];
4229 
4230  /* first time in */
4231  if (*segno == 0)
4232  XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
4233 
4234  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
4235 
4236  /*
4237  * No need to care about TLIs here, only used during a single run,
4238  * so each LSN only maps to a specific WAL record.
4239  */
4241  *segno);
4242 
4243  *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
4244 
4245  /* No harm in resetting the offset even in case of failure */
4246  file->curOffset = 0;
4247 
4248  if (*fd < 0 && errno == ENOENT)
4249  {
4250  *fd = -1;
4251  (*segno)++;
4252  continue;
4253  }
4254  else if (*fd < 0)
4255  ereport(ERROR,
4257  errmsg("could not open file \"%s\": %m",
4258  path)));
4259  }
4260 
4261  /*
4262  * Read the statically sized part of a change which has information
4263  * about the total size. If we couldn't read a record, we're at the
4264  * end of this file.
4265  */
4267  readBytes = FileRead(file->vfd, rb->outbuf,
4268  sizeof(ReorderBufferDiskChange),
4269  file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
4270 
4271  /* eof */
4272  if (readBytes == 0)
4273  {
4274  FileClose(*fd);
4275  *fd = -1;
4276  (*segno)++;
4277  continue;
4278  }
4279  else if (readBytes < 0)
4280  ereport(ERROR,
4282  errmsg("could not read from reorderbuffer spill file: %m")));
4283  else if (readBytes != sizeof(ReorderBufferDiskChange))
4284  ereport(ERROR,
4286  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4287  readBytes,
4288  (uint32) sizeof(ReorderBufferDiskChange))));
4289 
4290  file->curOffset += readBytes;
4291 
4292  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4293 
4295  sizeof(ReorderBufferDiskChange) + ondisk->size);
4296  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4297 
4298  readBytes = FileRead(file->vfd,
4299  rb->outbuf + sizeof(ReorderBufferDiskChange),
4300  ondisk->size - sizeof(ReorderBufferDiskChange),
4301  file->curOffset,
4302  WAIT_EVENT_REORDER_BUFFER_READ);
4303 
4304  if (readBytes < 0)
4305  ereport(ERROR,
4307  errmsg("could not read from reorderbuffer spill file: %m")));
4308  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
4309  ereport(ERROR,
4311  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4312  readBytes,
4313  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
4314 
4315  file->curOffset += readBytes;
4316 
4317  /*
4318  * ok, read a full change from disk, now restore it into proper
4319  * in-memory format
4320  */
4321  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
4322  restored++;
4323  }
4324 
4325  return restored;
4326 }
static void cleanup(void)
Definition: bootstrap.c:687
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1527
int FileRead(File file, void *buffer, size_t amount, off_t offset, uint32 wait_event_info)
Definition: fd.c:2088
int File
Definition: fd.h:49
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *data)
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
static const Size max_changes_in_memory
int wal_segment_size
Definition: xlog.c:146
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition: xlogdefs.h:48

References Assert(), ReorderBufferTXN::changes, CHECK_FOR_INTERRUPTS, cleanup(), dlist_mutable_iter::cur, TXNEntryFile::curOffset, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FileClose(), FileRead(), ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, max_changes_in_memory, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBuffer::outbuf, PathNameOpenFile(), PG_BINARY, ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferSerializedPath(), ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, TXNEntryFile::vfd, wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferIterTXNInit(), and ReorderBufferIterTXNNext().

◆ ReorderBufferRestoreCleanup()

static void ReorderBufferRestoreCleanup ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 4505 of file reorderbuffer.c.

4506 {
4507  XLogSegNo first;
4508  XLogSegNo cur;
4509  XLogSegNo last;
4510 
4513 
4514  XLByteToSeg(txn->first_lsn, first, wal_segment_size);
4515  XLByteToSeg(txn->final_lsn, last, wal_segment_size);
4516 
4517  /* iterate over all possible filenames, and delete them */
4518  for (cur = first; cur <= last; cur++)
4519  {
4520  char path[MAXPGPATH];
4521 
4523  if (unlink(path) != 0 && errno != ENOENT)
4524  ereport(ERROR,
4526  errmsg("could not remove file \"%s\": %m", path)));
4527  }
4528 }
struct cursor * cur
Definition: ecpg.c:28

References Assert(), cur, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, MAXPGPATH, MyReplicationSlot, ReorderBufferSerializedPath(), wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferCleanupTXN(), and ReorderBufferTruncateTXN().

◆ ReorderBufferReturnChange()

void ReorderBufferReturnChange ( ReorderBuffer rb,
ReorderBufferChange change,
bool  upd_mem 
)

Definition at line 484 of file reorderbuffer.c.

486 {
487  /* update memory accounting info */
488  if (upd_mem)
489  ReorderBufferChangeMemoryUpdate(rb, change, false,
490  ReorderBufferChangeSize(change));
491 
492  /* free contained data */
493  switch (change->action)
494  {
499  if (change->data.tp.newtuple)
500  {
501  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
502  change->data.tp.newtuple = NULL;
503  }
504 
505  if (change->data.tp.oldtuple)
506  {
507  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
508  change->data.tp.oldtuple = NULL;
509  }
510  break;
512  if (change->data.msg.prefix != NULL)
513  pfree(change->data.msg.prefix);
514  change->data.msg.prefix = NULL;
515  if (change->data.msg.message != NULL)
516  pfree(change->data.msg.message);
517  change->data.msg.message = NULL;
518  break;
520  if (change->data.inval.invalidations)
521  pfree(change->data.inval.invalidations);
522  change->data.inval.invalidations = NULL;
523  break;
525  if (change->data.snapshot)
526  {
527  ReorderBufferFreeSnap(rb, change->data.snapshot);
528  change->data.snapshot = NULL;
529  }
530  break;
531  /* no data in addition to the struct itself */
533  if (change->data.truncate.relids != NULL)
534  {
535  ReorderBufferReturnRelids(rb, change->data.truncate.relids);
536  change->data.truncate.relids = NULL;
537  }
538  break;
543  break;
544  }
545 
546  pfree(change);
547 }
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
void ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::msg, pfree(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferFreeSnap(), ReorderBufferReturnRelids(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, ReorderBufferChange::tp, and ReorderBufferChange::truncate.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferProcessTXN(), ReorderBufferQueueChange(), ReorderBufferResetTXN(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferToastReset(), and ReorderBufferTruncateTXN().

◆ ReorderBufferReturnRelids()

void ReorderBufferReturnRelids ( ReorderBuffer rb,
Oid relids 
)

Definition at line 606 of file reorderbuffer.c.

607 {
608  pfree(relids);
609 }

References pfree().

Referenced by ReorderBufferReturnChange().

◆ ReorderBufferReturnTupleBuf()

void ReorderBufferReturnTupleBuf ( ReorderBuffer rb,
ReorderBufferTupleBuf tuple 
)

Definition at line 575 of file reorderbuffer.c.

576 {
577  pfree(tuple);
578 }

References pfree().

Referenced by ReorderBufferReturnChange().

◆ ReorderBufferReturnTXN()

static void ReorderBufferReturnTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 430 of file reorderbuffer.c.

431 {
432  /* clean the lookup cache if we were cached (quite likely) */
433  if (rb->by_txn_last_xid == txn->xid)
434  {
436  rb->by_txn_last_txn = NULL;
437  }
438 
439  /* free data that's contained */
440 
441  if (txn->gid != NULL)
442  {
443  pfree(txn->gid);
444  txn->gid = NULL;
445  }
446 
447  if (txn->tuplecid_hash != NULL)
448  {
450  txn->tuplecid_hash = NULL;
451  }
452 
453  if (txn->invalidations)
454  {
455  pfree(txn->invalidations);
456  txn->invalidations = NULL;
457  }
458 
459  /* Reset the toast hash */
460  ReorderBufferToastReset(rb, txn);
461 
462  pfree(txn);
463 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:863

References ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBufferTXN::gid, hash_destroy(), ReorderBufferTXN::invalidations, InvalidTransactionId, pfree(), ReorderBufferToastReset(), ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::xid.

Referenced by ReorderBufferCleanupTXN().

◆ ReorderBufferSaveTXNSnapshot()

static void ReorderBufferSaveTXNSnapshot ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id 
)
inlinestatic

Definition at line 2005 of file reorderbuffer.c.

2007 {
2008  txn->command_id = command_id;
2009 
2010  /* Avoid copying if it's already copied. */
2011  if (snapshot_now->copied)
2012  txn->snapshot_now = snapshot_now;
2013  else
2014  txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2015  txn, command_id);
2016 }

References ReorderBufferTXN::command_id, SnapshotData::copied, ReorderBufferCopySnap(), and ReorderBufferTXN::snapshot_now.

Referenced by ReorderBufferProcessTXN(), and ReorderBufferResetTXN().

◆ ReorderBufferSerializeChange()

static void ReorderBufferSerializeChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  fd,
ReorderBufferChange change 
)
static

Definition at line 3742 of file reorderbuffer.c.

3744 {
3745  ReorderBufferDiskChange *ondisk;
3746  Size sz = sizeof(ReorderBufferDiskChange);
3747 
3749 
3750  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3751  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
3752 
3753  switch (change->action)
3754  {
3755  /* fall through these, they're all similar enough */
3760  {
3761  char *data;
3762  ReorderBufferTupleBuf *oldtup,
3763  *newtup;
3764  Size oldlen = 0;
3765  Size newlen = 0;
3766 
3767  oldtup = change->data.tp.oldtuple;
3768  newtup = change->data.tp.newtuple;
3769 
3770  if (oldtup)
3771  {
3772  sz += sizeof(HeapTupleData);
3773  oldlen = oldtup->tuple.t_len;
3774  sz += oldlen;
3775  }
3776 
3777  if (newtup)
3778  {
3779  sz += sizeof(HeapTupleData);
3780  newlen = newtup->tuple.t_len;
3781  sz += newlen;
3782  }
3783 
3784  /* make sure we have enough space */
3786 
3787  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3788  /* might have been reallocated above */
3789  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3790 
3791  if (oldlen)
3792  {
3793  memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
3794  data += sizeof(HeapTupleData);
3795 
3796  memcpy(data, oldtup->tuple.t_data, oldlen);
3797  data += oldlen;
3798  }
3799 
3800  if (newlen)
3801  {
3802  memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
3803  data += sizeof(HeapTupleData);
3804 
3805  memcpy(data, newtup->tuple.t_data, newlen);
3806  data += newlen;
3807  }
3808  break;
3809  }
3811  {
3812  char *data;
3813  Size prefix_size = strlen(change->data.msg.prefix) + 1;
3814 
3815  sz += prefix_size + change->data.msg.message_size +
3816  sizeof(Size) + sizeof(Size);
3818 
3819  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3820 
3821  /* might have been reallocated above */
3822  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3823 
3824  /* write the prefix including the size */
3825  memcpy(data, &prefix_size, sizeof(Size));
3826  data += sizeof(Size);
3827  memcpy(data, change->data.msg.prefix,
3828  prefix_size);
3829  data += prefix_size;
3830 
3831  /* write the message including the size */
3832  memcpy(data, &change->data.msg.message_size, sizeof(Size));
3833  data += sizeof(Size);
3834  memcpy(data, change->data.msg.message,
3835  change->data.msg.message_size);
3836  data += change->data.msg.message_size;
3837 
3838  break;
3839  }
3841  {
3842  char *data;
3843  Size inval_size = sizeof(SharedInvalidationMessage) *
3844  change->data.inval.ninvalidations;
3845 
3846  sz += inval_size;
3847 
3849  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3850 
3851  /* might have been reallocated above */
3852  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3853  memcpy(data, change->data.inval.invalidations, inval_size);
3854  data += inval_size;
3855 
3856  break;
3857  }
3859  {
3860  Snapshot snap;
3861  char *data;
3862 
3863  snap = change->data.snapshot;
3864 
3865  sz += sizeof(SnapshotData) +
3866  sizeof(TransactionId) * snap->xcnt +
3867  sizeof(TransactionId) * snap->subxcnt;
3868 
3869  /* make sure we have enough space */
3871  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3872  /* might have been reallocated above */
3873  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3874 
3875  memcpy(data, snap, sizeof(SnapshotData));
3876  data += sizeof(SnapshotData);
3877 
3878  if (snap->xcnt)
3879  {
3880  memcpy(data, snap->xip,
3881  sizeof(TransactionId) * snap->xcnt);
3882  data += sizeof(TransactionId) * snap->xcnt;
3883  }
3884 
3885  if (snap->subxcnt)
3886  {
3887  memcpy(data, snap->subxip,
3888  sizeof(TransactionId) * snap->subxcnt);
3889  data += sizeof(TransactionId) * snap->subxcnt;
3890  }
3891  break;
3892  }
3894  {
3895  Size size;
3896  char *data;
3897 
3898  /* account for the OIDs of truncated relations */
3899  size = sizeof(Oid) * change->data.truncate.nrelids;
3900  sz += size;
3901 
3902  /* make sure we have enough space */
3904 
3905  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3906  /* might have been reallocated above */
3907  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3908 
3909  memcpy(data, change->data.truncate.relids, size);
3910  data += size;
3911 
3912  break;
3913  }
3918  /* ReorderBufferChange contains everything important */
3919  break;
3920  }
3921 
3922  ondisk->size = sz;
3923 
3924  errno = 0;
3925  pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
3926  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
3927  {
3928  int save_errno = errno;
3929 
3931 
3932  /* if write didn't set errno, assume problem is no disk space */
3933  errno = save_errno ? save_errno : ENOSPC;
3934  ereport(ERROR,
3936  errmsg("could not write to data file for XID %u: %m",
3937  txn->xid)));
3938  }
3940 
3941  /*
3942  * Keep the transaction's final_lsn up to date with each change we send to
3943  * disk, so that ReorderBufferRestoreCleanup works correctly. (We used to
3944  * only do this on commit and abort records, but that doesn't work if a
3945  * system crash leaves a transaction without its abort record).
3946  *
3947  * Make sure not to move it backwards.
3948  */
3949  if (txn->final_lsn < change->lsn)
3950  txn->final_lsn = change->lsn;
3951 
3952  Assert(ondisk->change.action == change->action);
3953 }
#define write(a, b, c)
Definition: win32.h:14
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77

References ReorderBufferChange::action, Assert(), ReorderBufferDiskChange::change, CloseTransientFile(), ReorderBufferChange::data, data, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferTXN::final_lsn, if(), ReorderBufferChange::inval, ReorderBufferChange::lsn, ReorderBufferChange::msg, ReorderBuffer::outbuf, pgstat_report_wait_end(), pgstat_report_wait_start(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTupleBuf::tuple, write, SnapshotData::xcnt, ReorderBufferTXN::xid, and SnapshotData::xip.

Referenced by ReorderBufferSerializeTXN().

◆ ReorderBufferSerializedPath()

static void ReorderBufferSerializedPath ( char *  path,
ReplicationSlot slot,
TransactionId  xid,
XLogSegNo  segno 
)
static

Definition at line 4574 of file reorderbuffer.c.

4576 {
4577  XLogRecPtr recptr;
4578 
4579  XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
4580 
4581  snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.spill",
4583  xid, LSN_FORMAT_ARGS(recptr));
4584 }
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43

References ReplicationSlot::data, LSN_FORMAT_ARGS, MAXPGPATH, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, snprintf, wal_segment_size, and XLogSegNoOffsetToRecPtr.

Referenced by ReorderBufferRestoreChanges(), ReorderBufferRestoreCleanup(), and ReorderBufferSerializeTXN().

◆ ReorderBufferSerializeReserve()

static void ReorderBufferSerializeReserve ( ReorderBuffer rb,
Size  sz 
)
static

Definition at line 3460 of file reorderbuffer.c.

3461 {
3462  if (!rb->outbufsize)
3463  {
3464  rb->outbuf = MemoryContextAlloc(rb->context, sz);
3465  rb->outbufsize = sz;
3466  }
3467  else if (rb->outbufsize < sz)
3468  {
3469  rb->outbuf = repalloc(rb->outbuf, sz);
3470  rb->outbufsize = sz;
3471  }
3472 }

References ReorderBuffer::context, MemoryContextAlloc(), ReorderBuffer::outbuf, ReorderBuffer::outbufsize, and repalloc().

Referenced by ReorderBufferRestoreChanges(), and ReorderBufferSerializeChange().

◆ ReorderBufferSerializeTXN()

static void ReorderBufferSerializeTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 3650 of file reorderbuffer.c.

3651 {
3652  dlist_iter subtxn_i;
3653  dlist_mutable_iter change_i;
3654  int fd = -1;
3655  XLogSegNo curOpenSegNo = 0;
3656  Size spilled = 0;
3657  Size size = txn->size;
3658 
3659  elog(DEBUG2, "spill %u changes in XID %u to disk",
3660  (uint32) txn->nentries_mem, txn->xid);
3661 
3662  /* do the same to all child TXs */
3663  dlist_foreach(subtxn_i, &txn->subtxns)
3664  {
3665  ReorderBufferTXN *subtxn;
3666 
3667  subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
3668  ReorderBufferSerializeTXN(rb, subtxn);
3669  }
3670 
3671  /* serialize changestream */
3672  dlist_foreach_modify(change_i, &txn->changes)
3673  {
3674  ReorderBufferChange *change;
3675 
3676  change = dlist_container(ReorderBufferChange, node, change_i.cur);
3677 
3678  /*
3679  * store in segment in which it belongs by start lsn, don't split over
3680  * multiple segments tho
3681  */
3682  if (fd == -1 ||
3683  !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
3684  {
3685  char path[MAXPGPATH];
3686 
3687  if (fd != -1)
3689 
3690  XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
3691 
3692  /*
3693  * No need to care about TLIs here, only used during a single run,
3694  * so each LSN only maps to a specific WAL record.
3695  */
3697  curOpenSegNo);
3698 
3699  /* open segment, create it if necessary */
3700  fd = OpenTransientFile(path,
3701  O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
3702 
3703  if (fd < 0)
3704  ereport(ERROR,
3706  errmsg("could not open file \"%s\": %m", path)));
3707  }
3708 
3709  ReorderBufferSerializeChange(rb, txn, fd, change);
3710  dlist_delete(&change->node);
3711  ReorderBufferReturnChange(rb, change, true);
3712 
3713  spilled++;
3714  }
3715 
3716  /* update the statistics iff we have spilled anything */
3717  if (spilled)
3718  {
3719  rb->spillCount += 1;
3720  rb->spillBytes += size;
3721 
3722  /* don't consider already serialized transactions */
3723  rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
3724 
3725  /* update the decoding stats */
3727  }
3728 
3729  Assert(spilled == txn->nentries_mem);
3730  Assert(dlist_is_empty(&txn->changes));
3731  txn->nentries_mem = 0;
3733 
3734  if (fd != -1)
3736 }
void UpdateDecodingStats(LogicalDecodingContext *ctx)
Definition: logical.c:1912
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
#define rbtxn_is_serialized_clear(txn)
#define RBTXN_IS_SERIALIZED
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)

References Assert(), ReorderBufferTXN::changes, CloseTransientFile(), dlist_iter::cur, dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_delete(), dlist_foreach, dlist_foreach_modify, dlist_is_empty(), elog(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferChange::lsn, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, OpenTransientFile(), PG_BINARY, ReorderBuffer::private_data, RBTXN_IS_SERIALIZED, rbtxn_is_serialized, rbtxn_is_serialized_clear, ReorderBufferReturnChange(), ReorderBufferSerializeChange(), ReorderBufferSerializedPath(), ReorderBufferTXN::size, ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBufferTXN::subtxns, ReorderBufferTXN::txn_flags, UpdateDecodingStats(), wal_segment_size, ReorderBufferTXN::xid, XLByteInSeg, and XLByteToSeg.

Referenced by ReorderBufferCheckMemoryLimit(), and ReorderBufferIterTXNInit().

◆ ReorderBufferSetBaseSnapshot()

void ReorderBufferSetBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 3122 of file reorderbuffer.c.

3124 {
3125  ReorderBufferTXN *txn;
3126  bool is_new;
3127 
3128  Assert(snap != NULL);
3129 
3130  /*
3131  * Fetch the transaction to operate on. If we know it's a subtransaction,
3132  * operate on its top-level transaction instead.
3133  */
3134  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
3135  if (rbtxn_is_known_subxact(txn))
3136  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3137  NULL, InvalidXLogRecPtr, false);
3138  Assert(txn->base_snapshot == NULL);
3139 
3140  txn->base_snapshot = snap;
3141  txn->base_snapshot_lsn = lsn;
3143 
3144  AssertTXNLsnOrder(rb);
3145 }

References Assert(), AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBufferTXN::base_snapshot_node, dlist_push_tail(), InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), ReorderBufferTXN::toplevel_xid, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by SnapBuildCommitTxn(), and SnapBuildProcessChange().

◆ ReorderBufferSetRestartPoint()

void ReorderBufferSetRestartPoint ( ReorderBuffer rb,
XLogRecPtr  ptr 
)

Definition at line 1052 of file reorderbuffer.c.

1053 {
1054  rb->current_restart_decoding_lsn = ptr;
1055 }

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

◆ ReorderBufferSkipPrepare()

void ReorderBufferSkipPrepare ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 2750 of file reorderbuffer.c.

2751 {
2752  ReorderBufferTXN *txn;
2753 
2754  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2755 
2756  /* unknown transaction, nothing to do */
2757  if (txn == NULL)
2758  return;
2759 
2761 }
#define RBTXN_SKIPPED_PREPARE

References InvalidXLogRecPtr, RBTXN_SKIPPED_PREPARE, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by DecodePrepare().

◆ ReorderBufferStreamCommit()

static void ReorderBufferStreamCommit ( ReorderBuffer rb,
ReorderBufferTXN txn