PostgreSQL Source Code  git master
reorderbuffer.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/detoast.h"
#include "access/heapam.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "common/int.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenumbermap.h"
Include dependency graph for reorderbuffer.c:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTXNByIdEnt
 
struct  ReorderBufferTupleCidKey
 
struct  ReorderBufferTupleCidEnt
 
struct  TXNEntryFile
 
struct  ReorderBufferIterTXNEntry
 
struct  ReorderBufferIterTXNState
 
struct  ReorderBufferToastEnt
 
struct  ReorderBufferDiskChange
 
struct  RewriteMappingFile
 

Macros

#define IsSpecInsert(action)
 
#define IsSpecConfirmOrAbort(action)
 
#define IsInsertOrUpdate(action)
 
#define CHANGES_THRESHOLD   100
 

Typedefs

typedef struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
 
typedef struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
 
typedef struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
 
typedef struct TXNEntryFile TXNEntryFile
 
typedef struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
 
typedef struct ReorderBufferIterTXNState ReorderBufferIterTXNState
 
typedef struct ReorderBufferToastEnt ReorderBufferToastEnt
 
typedef struct ReorderBufferDiskChange ReorderBufferDiskChange
 
typedef struct RewriteMappingFile RewriteMappingFile
 

Functions

static ReorderBufferTXNReorderBufferGetTXN (ReorderBuffer *rb)
 
static void ReorderBufferReturnTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferTXNReorderBufferTXNByXid (ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
 
static void ReorderBufferTransferSnapToParent (ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
 
static void AssertTXNLsnOrder (ReorderBuffer *rb)
 
static void ReorderBufferIterTXNInit (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
 
static ReorderBufferChangeReorderBufferIterTXNNext (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferIterTXNFinish (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferExecuteInvalidations (uint32 nmsgs, SharedInvalidationMessage *msgs)
 
static void ReorderBufferCheckMemoryLimit (ReorderBuffer *rb)
 
static void ReorderBufferSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeChange (ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
 
static Size ReorderBufferRestoreChanges (ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
 
static void ReorderBufferRestoreChange (ReorderBuffer *rb, ReorderBufferTXN *txn, char *data)
 
static void ReorderBufferRestoreCleanup (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferTruncateTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
 
static void ReorderBufferCleanupSerializedTXNs (const char *slotname)
 
static void ReorderBufferSerializedPath (char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
 
static int ReorderBufferTXNSizeCompare (const pairingheap_node *a, const pairingheap_node *b, void *arg)
 
static void ReorderBufferFreeSnap (ReorderBuffer *rb, Snapshot snap)
 
static Snapshot ReorderBufferCopySnap (ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
 
static bool ReorderBufferCanStream (ReorderBuffer *rb)
 
static bool ReorderBufferCanStartStreaming (ReorderBuffer *rb)
 
static void ReorderBufferStreamTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferStreamCommit (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastInitHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReset (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReplace (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void ReorderBufferToastAppendChunk (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static Size ReorderBufferChangeSize (ReorderBufferChange *change)
 
static void ReorderBufferChangeMemoryUpdate (ReorderBuffer *rb, ReorderBufferChange *change, ReorderBufferTXN *txn, bool addition, Size sz)
 
ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *rb)
 
void ReorderBufferReturnChange (ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
 
HeapTuple ReorderBufferGetTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (HeapTuple tuple)
 
OidReorderBufferGetRelids (ReorderBuffer *rb, int nrelids)
 
void ReorderBufferReturnRelids (ReorderBuffer *rb, Oid *relids)
 
static void ReorderBufferProcessPartialChange (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
static void AssertChangeLsnOrder (ReorderBufferTXN *txn)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
TransactionId ReorderBufferGetOldestXmin (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
static int ReorderBufferIterCompare (Datum a, Datum b, void *arg)
 
static void ReorderBufferCleanupTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferBuildTupleCidHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void SetupCheckXidLive (TransactionId xid)
 
static void ReorderBufferApplyChange (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyTruncate (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyMessage (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferSaveTXNSnapshot (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
 
static void ReorderBufferResetTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
 
static void ReorderBufferProcessTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)
 
static void ReorderBufferReplay (ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
bool ReorderBufferRememberPrepareInfo (ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferSkipPrepare (ReorderBuffer *rb, TransactionId xid)
 
void ReorderBufferPrepare (ReorderBuffer *rb, TransactionId xid, char *gid)
 
void ReorderBufferFinishPrepared (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, TimestampTz abort_time)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferInvalidate (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileLocator locator, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
TransactionIdReorderBufferGetCatalogChangesXacts (ReorderBuffer *rb)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
static void ReorderBufferSerializeReserve (ReorderBuffer *rb, Size sz)
 
static ReorderBufferTXNReorderBufferLargestTXN (ReorderBuffer *rb)
 
static ReorderBufferTXNReorderBufferLargestStreamableTopTXN (ReorderBuffer *rb)
 
void StartupReorderBuffer (void)
 
static void ApplyLogicalMappingFile (HTAB *tuplecid_data, Oid relid, const char *fname)
 
static bool TransactionIdInArray (TransactionId xid, TransactionId *xip, Size num)
 
static int file_sort_by_lsn (const ListCell *a_p, const ListCell *b_p)
 
static void UpdateLogicalMappings (HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
 
bool ResolveCminCmaxDuringDecoding (HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
 

Variables

int logical_decoding_work_mem
 
static const Size max_changes_in_memory = 4096
 
int debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING_BUFFERED
 

Macro Definition Documentation

◆ CHANGES_THRESHOLD

#define CHANGES_THRESHOLD   100

◆ IsInsertOrUpdate

#define IsInsertOrUpdate (   action)
Value:

Definition at line 193 of file reorderbuffer.c.

◆ IsSpecConfirmOrAbort

#define IsSpecConfirmOrAbort (   action)
Value:
( \
)
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
Definition: reorderbuffer.h:56
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT
Definition: reorderbuffer.h:57

Definition at line 188 of file reorderbuffer.c.

◆ IsSpecInsert

#define IsSpecInsert (   action)
Value:

Definition at line 184 of file reorderbuffer.c.

Typedef Documentation

◆ ReorderBufferDiskChange

◆ ReorderBufferIterTXNEntry

◆ ReorderBufferIterTXNState

◆ ReorderBufferToastEnt

◆ ReorderBufferTupleCidEnt

◆ ReorderBufferTupleCidKey

◆ ReorderBufferTXNByIdEnt

◆ RewriteMappingFile

◆ TXNEntryFile

typedef struct TXNEntryFile TXNEntryFile

Function Documentation

◆ ApplyLogicalMappingFile()

static void ApplyLogicalMappingFile ( HTAB tuplecid_data,
Oid  relid,
const char *  fname 
)
static

Definition at line 5052 of file reorderbuffer.c.

5053 {
5054  char path[MAXPGPATH];
5055  int fd;
5056  int readBytes;
5058 
5059  sprintf(path, "pg_logical/mappings/%s", fname);
5060  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
5061  if (fd < 0)
5062  ereport(ERROR,
5064  errmsg("could not open file \"%s\": %m", path)));
5065 
5066  while (true)
5067  {
5070  ReorderBufferTupleCidEnt *new_ent;
5071  bool found;
5072 
5073  /* be careful about padding */
5074  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
5075 
5076  /* read all mappings till the end of the file */
5077  pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ);
5078  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
5080 
5081  if (readBytes < 0)
5082  ereport(ERROR,
5084  errmsg("could not read file \"%s\": %m",
5085  path)));
5086  else if (readBytes == 0) /* EOF */
5087  break;
5088  else if (readBytes != sizeof(LogicalRewriteMappingData))
5089  ereport(ERROR,
5091  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
5092  path, readBytes,
5093  (int32) sizeof(LogicalRewriteMappingData))));
5094 
5095  key.rlocator = map.old_locator;
5096  ItemPointerCopy(&map.old_tid,
5097  &key.tid);
5098 
5099 
5100  ent = (ReorderBufferTupleCidEnt *)
5102 
5103  /* no existing mapping, no need to update */
5104  if (!ent)
5105  continue;
5106 
5107  key.rlocator = map.new_locator;
5108  ItemPointerCopy(&map.new_tid,
5109  &key.tid);
5110 
5111  new_ent = (ReorderBufferTupleCidEnt *)
5113 
5114  if (found)
5115  {
5116  /*
5117  * Make sure the existing mapping makes sense. We sometime update
5118  * old records that did not yet have a cmax (e.g. pg_class' own
5119  * entry while rewriting it) during rewrites, so allow that.
5120  */
5121  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
5122  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
5123  }
5124  else
5125  {
5126  /* update mapping */
5127  new_ent->cmin = ent->cmin;
5128  new_ent->cmax = ent->cmax;
5129  new_ent->combocid = ent->combocid;
5130  }
5131  }
5132 
5133  if (CloseTransientFile(fd) != 0)
5134  ereport(ERROR,
5136  errmsg("could not close file \"%s\": %m", path)));
5137 }
#define InvalidCommandId
Definition: c.h:669
signed int int32
Definition: c.h:494
#define Assert(condition)
Definition: c.h:858
#define PG_BINARY
Definition: c.h:1273
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
int errcode_for_file_access(void)
Definition: elog.c:882
int errmsg(const char *fmt,...)
Definition: elog.c:1072
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
int CloseTransientFile(int fd)
Definition: fd.c:2809
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2633
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_ENTER
Definition: hsearch.h:114
#define read(a, b, c)
Definition: win32.h:13
static void ItemPointerCopy(const ItemPointerData *fromPointer, ItemPointerData *toPointer)
Definition: itemptr.h:172
#define MAXPGPATH
#define sprintf
Definition: port.h:240
static int fd(const char *x, int i)
Definition: preproc-init.c:105
static HTAB * tuplecid_data
Definition: snapmgr.c:102
ItemPointerData new_tid
Definition: rewriteheap.h:40
RelFileLocator old_locator
Definition: rewriteheap.h:37
ItemPointerData old_tid
Definition: rewriteheap.h:39
RelFileLocator new_locator
Definition: rewriteheap.h:38
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:88
static void pgstat_report_wait_end(void)
Definition: wait_event.h:104

References Assert, CloseTransientFile(), ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), HASH_ENTER, HASH_FIND, hash_search(), InvalidCommandId, ItemPointerCopy(), sort-test::key, MAXPGPATH, LogicalRewriteMappingData::new_locator, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_locator, LogicalRewriteMappingData::old_tid, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, sprintf, and tuplecid_data.

Referenced by UpdateLogicalMappings().

◆ AssertChangeLsnOrder()

static void AssertChangeLsnOrder ( ReorderBufferTXN txn)
static

Definition at line 983 of file reorderbuffer.c.

984 {
985 #ifdef USE_ASSERT_CHECKING
986  dlist_iter iter;
987  XLogRecPtr prev_lsn = txn->first_lsn;
988 
989  dlist_foreach(iter, &txn->changes)
990  {
991  ReorderBufferChange *cur_change;
992 
993  cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
994 
996  Assert(cur_change->lsn != InvalidXLogRecPtr);
997  Assert(txn->first_lsn <= cur_change->lsn);
998 
999  if (txn->end_lsn != InvalidXLogRecPtr)
1000  Assert(cur_change->lsn <= txn->end_lsn);
1001 
1002  Assert(prev_lsn <= cur_change->lsn);
1003 
1004  prev_lsn = cur_change->lsn;
1005  }
1006 #endif
1007 }
#define dlist_foreach(iter, lhead)
Definition: ilist.h:623
#define dlist_container(type, membername, ptr)
Definition: ilist.h:593
XLogRecPtr first_lsn
XLogRecPtr end_lsn
dlist_head changes
dlist_node * cur
Definition: ilist.h:179
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert, ReorderBufferTXN::changes, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, and ReorderBufferChange::lsn.

Referenced by ReorderBufferIterTXNInit().

◆ AssertTXNLsnOrder()

static void AssertTXNLsnOrder ( ReorderBuffer rb)
static

Definition at line 912 of file reorderbuffer.c.

913 {
914 #ifdef USE_ASSERT_CHECKING
916  dlist_iter iter;
917  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
918  XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
919 
920  /*
921  * Skip the verification if we don't reach the LSN at which we start
922  * decoding the contents of transactions yet because until we reach the
923  * LSN, we could have transactions that don't have the association between
924  * the top-level transaction and subtransaction yet and consequently have
925  * the same LSN. We don't guarantee this association until we try to
926  * decode the actual contents of transaction. The ordering of the records
927  * prior to the start_decoding_at LSN should have been checked before the
928  * restart.
929  */
931  return;
932 
933  dlist_foreach(iter, &rb->toplevel_by_lsn)
934  {
936  iter.cur);
937 
938  /* start LSN must be set */
939  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
940 
941  /* If there is an end LSN, it must be higher than start LSN */
942  if (cur_txn->end_lsn != InvalidXLogRecPtr)
943  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
944 
945  /* Current initial LSN must be strictly higher than previous */
946  if (prev_first_lsn != InvalidXLogRecPtr)
947  Assert(prev_first_lsn < cur_txn->first_lsn);
948 
949  /* known-as-subtxn txns must not be listed */
950  Assert(!rbtxn_is_known_subxact(cur_txn));
951 
952  prev_first_lsn = cur_txn->first_lsn;
953  }
954 
956  {
958  base_snapshot_node,
959  iter.cur);
960 
961  /* base snapshot (and its LSN) must be set */
962  Assert(cur_txn->base_snapshot != NULL);
964 
965  /* current LSN must be strictly higher than previous */
966  if (prev_base_snap_lsn != InvalidXLogRecPtr)
967  Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
968 
969  /* known-as-subtxn txns must not be listed */
970  Assert(!rbtxn_is_known_subxact(cur_txn));
971 
972  prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
973  }
974 #endif
975 }
#define rbtxn_is_known_subxact(txn)
bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
Definition: snapbuild.c:433
XLogReaderState * reader
Definition: logical.h:42
struct SnapBuild * snapshot_builder
Definition: logical.h:44
XLogRecPtr base_snapshot_lsn
Snapshot base_snapshot
dlist_head txns_by_base_snapshot_lsn
dlist_head toplevel_by_lsn
void * private_data
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, XLogReaderState::EndRecPtr, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, ReorderBuffer::private_data, rbtxn_is_known_subxact, LogicalDecodingContext::reader, SnapBuildXactNeedsSkip(), LogicalDecodingContext::snapshot_builder, ReorderBuffer::toplevel_by_lsn, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferAssignChild(), ReorderBufferGetOldestTXN(), ReorderBufferGetOldestXmin(), ReorderBufferSetBaseSnapshot(), and ReorderBufferTXNByXid().

◆ file_sort_by_lsn()

static int file_sort_by_lsn ( const ListCell a_p,
const ListCell b_p 
)
static

Definition at line 5154 of file reorderbuffer.c.

5155 {
5158 
5159  return pg_cmp_u64(a->lsn, b->lsn);
5160 }
static int pg_cmp_u64(uint64 a, uint64 b)
Definition: int.h:501
int b
Definition: isn.c:70
int a
Definition: isn.c:69
#define lfirst(lc)
Definition: pg_list.h:172

References a, b, lfirst, and pg_cmp_u64().

Referenced by UpdateLogicalMappings().

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
TimestampTz  abort_time 
)

Definition at line 2902 of file reorderbuffer.c.

2904 {
2905  ReorderBufferTXN *txn;
2906 
2907  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2908  false);
2909 
2910  /* unknown, nothing to remove */
2911  if (txn == NULL)
2912  return;
2913 
2914  txn->xact_time.abort_time = abort_time;
2915 
2916  /* For streamed transactions notify the remote node about the abort. */
2917  if (rbtxn_is_streamed(txn))
2918  {
2919  rb->stream_abort(rb, txn, lsn);
2920 
2921  /*
2922  * We might have decoded changes for this transaction that could load
2923  * the cache as per the current transaction's view (consider DDL's
2924  * happened in this transaction). We don't want the decoding of future
2925  * transactions to use those cache entries so execute invalidations.
2926  */
2927  if (txn->ninvalidations > 0)
2929  txn->invalidations);
2930  }
2931 
2932  /* cosmetic... */
2933  txn->final_lsn = lsn;
2934 
2935  /* remove potential on-disk data, and deallocate */
2936  ReorderBufferCleanupTXN(rb, txn);
2937 }
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_streamed(txn)
SharedInvalidationMessage * invalidations
TimestampTz abort_time
XLogRecPtr final_lsn
union ReorderBufferTXN::@111 xact_time
ReorderBufferStreamAbortCB stream_abort

References ReorderBufferTXN::abort_time, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), ReorderBuffer::stream_abort, and ReorderBufferTXN::xact_time.

Referenced by DecodeAbort().

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)

Definition at line 2947 of file reorderbuffer.c.

2948 {
2949  dlist_mutable_iter it;
2950 
2951  /*
2952  * Iterate through all (potential) toplevel TXNs and abort all that are
2953  * older than what possibly can be running. Once we've found the first
2954  * that is alive we stop, there might be some that acquired an xid earlier
2955  * but started writing later, but it's unlikely and they will be cleaned
2956  * up in a later call to this function.
2957  */
2959  {
2960  ReorderBufferTXN *txn;
2961 
2962  txn = dlist_container(ReorderBufferTXN, node, it.cur);
2963 
2964  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2965  {
2966  elog(DEBUG2, "aborting old transaction %u", txn->xid);
2967 
2968  /* Notify the remote node about the crash/immediate restart. */
2969  if (rbtxn_is_streamed(txn))
2970  rb->stream_abort(rb, txn, InvalidXLogRecPtr);
2971 
2972  /* remove potential on-disk data, and deallocate this tx */
2973  ReorderBufferCleanupTXN(rb, txn);
2974  }
2975  else
2976  return;
2977  }
2978 }
#define DEBUG2
Definition: elog.h:29
#define elog(elevel,...)
Definition: elog.h:224
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:640
TransactionId xid
dlist_node * cur
Definition: ilist.h:200
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280

References dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, elog, InvalidXLogRecPtr, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBuffer::stream_abort, ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), and ReorderBufferTXN::xid.

Referenced by standby_decode().

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 3300 of file reorderbuffer.c.

3303 {
3304  ReorderBufferTXN *txn;
3305  MemoryContext oldcontext;
3306  ReorderBufferChange *change;
3307 
3308  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3309 
3310  oldcontext = MemoryContextSwitchTo(rb->context);
3311 
3312  /*
3313  * Collect all the invalidations under the top transaction, if available,
3314  * so that we can execute them all together. See comments atop this
3315  * function.
3316  */
3317  txn = rbtxn_get_toptxn(txn);
3318 
3319  Assert(nmsgs > 0);
3320 
3321  /* Accumulate invalidations. */
3322  if (txn->ninvalidations == 0)
3323  {
3324  txn->ninvalidations = nmsgs;
3326  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3327  memcpy(txn->invalidations, msgs,
3328  sizeof(SharedInvalidationMessage) * nmsgs);
3329  }
3330  else
3331  {
3334  (txn->ninvalidations + nmsgs));
3335 
3336  memcpy(txn->invalidations + txn->ninvalidations, msgs,
3337  nmsgs * sizeof(SharedInvalidationMessage));
3338  txn->ninvalidations += nmsgs;
3339  }
3340 
3341  change = ReorderBufferGetChange(rb);
3343  change->data.inval.ninvalidations = nmsgs;
3344  change->data.inval.invalidations = (SharedInvalidationMessage *)
3345  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3346  memcpy(change->data.inval.invalidations, msgs,
3347  sizeof(SharedInvalidationMessage) * nmsgs);
3348 
3349  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3350 
3351  MemoryContextSwitchTo(oldcontext);
3352 }
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1540
void * palloc(Size size)
Definition: mcxt.c:1316
MemoryContextSwitchTo(old_ctx)
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
#define rbtxn_get_toptxn(txn)
@ REORDER_BUFFER_CHANGE_INVALIDATION
Definition: reorderbuffer.h:51
ReorderBufferChangeType action
Definition: reorderbuffer.h:76
union ReorderBufferChange::@105 data
struct ReorderBufferChange::@105::@110 inval
MemoryContext context

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferTXN::invalidations, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, palloc(), rbtxn_get_toptxn, REORDER_BUFFER_CHANGE_INVALIDATION, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), and repalloc().

Referenced by xact_decode().

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileLocator  locator,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 3263 of file reorderbuffer.c.

3267 {
3269  ReorderBufferTXN *txn;
3270 
3271  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3272 
3273  change->data.tuplecid.locator = locator;
3274  change->data.tuplecid.tid = tid;
3275  change->data.tuplecid.cmin = cmin;
3276  change->data.tuplecid.cmax = cmax;
3277  change->data.tuplecid.combocid = combocid;
3278  change->lsn = lsn;
3279  change->txn = txn;
3281 
3282  dlist_push_tail(&txn->tuplecids, &change->node);
3283  txn->ntuplecids++;
3284 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:364
@ REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
Definition: reorderbuffer.h:54
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:79
struct ReorderBufferChange::@105::@109 tuplecid
dlist_head tuplecids

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, and ReorderBufferChange::txn.

Referenced by SnapBuildProcessNewCid().

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

◆ ReorderBufferAllocate()

ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 309 of file reorderbuffer.c.

310 {
311  ReorderBuffer *buffer;
312  HASHCTL hash_ctl;
313  MemoryContext new_ctx;
314 
315  Assert(MyReplicationSlot != NULL);
316 
317  /* allocate memory in own context, to have better accountability */
319  "ReorderBuffer",
321 
322  buffer =
323  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
324 
325  memset(&hash_ctl, 0, sizeof(hash_ctl));
326 
327  buffer->context = new_ctx;
328 
329  buffer->change_context = SlabContextCreate(new_ctx,
330  "Change",
332  sizeof(ReorderBufferChange));
333 
334  buffer->txn_context = SlabContextCreate(new_ctx,
335  "TXN",
337  sizeof(ReorderBufferTXN));
338 
339  /*
340  * XXX the allocation sizes used below pre-date generation context's block
341  * growing code. These values should likely be benchmarked and set to
342  * more suitable values.
343  */
344  buffer->tup_context = GenerationContextCreate(new_ctx,
345  "Tuples",
349 
350  hash_ctl.keysize = sizeof(TransactionId);
351  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
352  hash_ctl.hcxt = buffer->context;
353 
354  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
356 
358  buffer->by_txn_last_txn = NULL;
359 
360  buffer->outbuf = NULL;
361  buffer->outbufsize = 0;
362  buffer->size = 0;
363 
364  /* txn_heap is ordered by transaction size */
366 
367  buffer->spillTxns = 0;
368  buffer->spillCount = 0;
369  buffer->spillBytes = 0;
370  buffer->streamTxns = 0;
371  buffer->streamCount = 0;
372  buffer->streamBytes = 0;
373  buffer->totalTxns = 0;
374  buffer->totalBytes = 0;
375 
377 
378  dlist_init(&buffer->toplevel_by_lsn);
380  dclist_init(&buffer->catchange_txns);
381 
382  /*
383  * Ensure there's no stale data from prior uses of this slot, in case some
384  * prior exit avoided calling ReorderBufferFree. Failure to do this can
385  * produce duplicated txns, and it's very cheap if there's nothing there.
386  */
388 
389  return buffer;
390 }
#define NameStr(name)
Definition: c.h:746
uint32 TransactionId
Definition: c.h:652
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: generation.c:160
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
static void dclist_init(dclist_head *head)
Definition: ilist.h:671
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1180
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:189
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:190
pairingheap * pairingheap_allocate(pairingheap_comparator compare, void *arg)
Definition: pairingheap.c:42
static int ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg)
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:322
ReplicationSlot * MyReplicationSlot
Definition: slot.c:138
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
MemoryContext hcxt
Definition: hsearch.h:86
dclist_head catchange_txns
MemoryContext change_context
ReorderBufferTXN * by_txn_last_txn
TransactionId by_txn_last_xid
MemoryContext tup_context
pairingheap * txn_heap
MemoryContext txn_context
XLogRecPtr current_restart_decoding_lsn
ReplicationSlotPersistentData data
Definition: slot.h:178
#define InvalidTransactionId
Definition: transam.h:31

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::catchange_txns, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dclist_init(), dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, pairingheap_allocate(), ReorderBufferCleanupSerializedTXNs(), ReorderBufferTXNSizeCompare(), ReorderBuffer::size, SLAB_DEFAULT_BLOCK_SIZE, SLAB_LARGE_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBuffer::toplevel_by_lsn, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBuffer::tup_context, ReorderBuffer::txn_context, ReorderBuffer::txn_heap, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

◆ ReorderBufferApplyChange()

static void ReorderBufferApplyChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1968 of file reorderbuffer.c.

1971 {
1972  if (streaming)
1973  rb->stream_change(rb, txn, relation, change);
1974  else
1975  rb->apply_change(rb, txn, relation, change);
1976 }
ReorderBufferStreamChangeCB stream_change
ReorderBufferApplyChangeCB apply_change

References ReorderBuffer::apply_change, and ReorderBuffer::stream_change.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferApplyMessage()

static void ReorderBufferApplyMessage ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1996 of file reorderbuffer.c.

1998 {
1999  if (streaming)
2000  rb->stream_message(rb, txn, change->lsn, true,
2001  change->data.msg.prefix,
2002  change->data.msg.message_size,
2003  change->data.msg.message);
2004  else
2005  rb->message(rb, txn, change->lsn, true,
2006  change->data.msg.prefix,
2007  change->data.msg.message_size,
2008  change->data.msg.message);
2009 }
struct ReorderBufferChange::@105::@108 msg
ReorderBufferStreamMessageCB stream_message
ReorderBufferMessageCB message

References ReorderBufferChange::data, ReorderBufferChange::lsn, ReorderBuffer::message, ReorderBufferChange::msg, and ReorderBuffer::stream_message.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferApplyTruncate()

static void ReorderBufferApplyTruncate ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  nrelations,
Relation relations,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1982 of file reorderbuffer.c.

1985 {
1986  if (streaming)
1987  rb->stream_truncate(rb, txn, nrelations, relations, change);
1988  else
1989  rb->apply_truncate(rb, txn, nrelations, relations, change);
1990 }
ReorderBufferStreamTruncateCB stream_truncate
ReorderBufferApplyTruncateCB apply_truncate

References ReorderBuffer::apply_truncate, and ReorderBuffer::stream_truncate.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 1069 of file reorderbuffer.c.

1071 {
1072  ReorderBufferTXN *txn;
1073  ReorderBufferTXN *subtxn;
1074  bool new_top;
1075  bool new_sub;
1076 
1077  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1078  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1079 
1080  if (!new_sub)
1081  {
1082  if (rbtxn_is_known_subxact(subtxn))
1083  {
1084  /* already associated, nothing to do */
1085  return;
1086  }
1087  else
1088  {
1089  /*
1090  * We already saw this transaction, but initially added it to the
1091  * list of top-level txns. Now that we know it's not top-level,
1092  * remove it from there.
1093  */
1094  dlist_delete(&subtxn->node);
1095  }
1096  }
1097 
1098  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1099  subtxn->toplevel_xid = xid;
1100  Assert(subtxn->nsubtxns == 0);
1101 
1102  /* set the reference to top-level transaction */
1103  subtxn->toptxn = txn;
1104 
1105  /* add to subtransaction list */
1106  dlist_push_tail(&txn->subtxns, &subtxn->node);
1107  txn->nsubtxns++;
1108 
1109  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1110  ReorderBufferTransferSnapToParent(txn, subtxn);
1111 
1112  /* Verify LSN-ordering invariant */
1113  AssertTXNLsnOrder(rb);
1114 }
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
#define RBTXN_IS_SUBXACT
TransactionId toplevel_xid
struct ReorderBufferTXN * toptxn
dlist_head subtxns

References Assert, AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SUBXACT, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, ReorderBufferTXN::toplevel_xid, ReorderBufferTXN::toptxn, and ReorderBufferTXN::txn_flags.

Referenced by LogicalDecodingProcessRecord(), and ReorderBufferCommitChild().

◆ ReorderBufferBuildTupleCidHash()

static void ReorderBufferBuildTupleCidHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1734 of file reorderbuffer.c.

1735 {
1736  dlist_iter iter;
1737  HASHCTL hash_ctl;
1738 
1740  return;
1741 
1742  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1743  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1744  hash_ctl.hcxt = rb->context;
1745 
1746  /*
1747  * create the hash with the exact number of to-be-stored tuplecids from
1748  * the start
1749  */
1750  txn->tuplecid_hash =
1751  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1753 
1754  dlist_foreach(iter, &txn->tuplecids)
1755  {
1758  bool found;
1759  ReorderBufferChange *change;
1760 
1761  change = dlist_container(ReorderBufferChange, node, iter.cur);
1762 
1764 
1765  /* be careful about padding */
1766  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1767 
1768  key.rlocator = change->data.tuplecid.locator;
1769 
1770  ItemPointerCopy(&change->data.tuplecid.tid,
1771  &key.tid);
1772 
1773  ent = (ReorderBufferTupleCidEnt *)
1774  hash_search(txn->tuplecid_hash, &key, HASH_ENTER, &found);
1775  if (!found)
1776  {
1777  ent->cmin = change->data.tuplecid.cmin;
1778  ent->cmax = change->data.tuplecid.cmax;
1779  ent->combocid = change->data.tuplecid.combocid;
1780  }
1781  else
1782  {
1783  /*
1784  * Maybe we already saw this tuple before in this transaction, but
1785  * if so it must have the same cmin.
1786  */
1787  Assert(ent->cmin == change->data.tuplecid.cmin);
1788 
1789  /*
1790  * cmax may be initially invalid, but once set it can only grow,
1791  * and never become invalid again.
1792  */
1793  Assert((ent->cmax == InvalidCommandId) ||
1794  ((change->data.tuplecid.cmax != InvalidCommandId) &&
1795  (change->data.tuplecid.cmax > ent->cmax)));
1796  ent->cmax = change->data.tuplecid.cmax;
1797  }
1798  }
1799 }
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
#define rbtxn_has_catalog_changes(txn)

References ReorderBufferChange::action, Assert, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, dlist_container, dlist_foreach, dlist_is_empty(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), HASHCTL::hcxt, InvalidCommandId, ItemPointerCopy(), sort-test::key, HASHCTL::keysize, ReorderBufferTXN::ntuplecids, rbtxn_has_catalog_changes, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::tuplecids.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferCanStartStreaming()

static bool ReorderBufferCanStartStreaming ( ReorderBuffer rb)
inlinestatic

Definition at line 4000 of file reorderbuffer.c.

4001 {
4003  SnapBuild *builder = ctx->snapshot_builder;
4004 
4005  /* We can't start streaming unless a consistent state is reached. */
4007  return false;
4008 
4009  /*
4010  * We can't start streaming immediately even if the streaming is enabled
4011  * because we previously decoded this transaction and now just are
4012  * restarting.
4013  */
4014  if (ReorderBufferCanStream(rb) &&
4015  !SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr))
4016  return true;
4017 
4018  return false;
4019 }
static bool ReorderBufferCanStream(ReorderBuffer *rb)
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
Definition: snapbuild.c:406
@ SNAPBUILD_CONSISTENT
Definition: snapbuild.h:46
XLogRecPtr ReadRecPtr
Definition: xlogreader.h:206

References ReorderBuffer::private_data, LogicalDecodingContext::reader, XLogReaderState::ReadRecPtr, ReorderBufferCanStream(), SNAPBUILD_CONSISTENT, SnapBuildCurrentState(), SnapBuildXactNeedsSkip(), and LogicalDecodingContext::snapshot_builder.

Referenced by ReorderBufferCheckMemoryLimit(), and ReorderBufferProcessPartialChange().

◆ ReorderBufferCanStream()

static bool ReorderBufferCanStream ( ReorderBuffer rb)
inlinestatic

Definition at line 3991 of file reorderbuffer.c.

3992 {
3994 
3995  return ctx->streaming;
3996 }

References ReorderBuffer::private_data, and LogicalDecodingContext::streaming.

Referenced by ReorderBufferCanStartStreaming(), and ReorderBufferProcessPartialChange().

◆ ReorderBufferChangeMemoryUpdate()

static void ReorderBufferChangeMemoryUpdate ( ReorderBuffer rb,
ReorderBufferChange change,
ReorderBufferTXN txn,
bool  addition,
Size  sz 
)
static

Definition at line 3192 of file reorderbuffer.c.

3196 {
3197  ReorderBufferTXN *toptxn;
3198 
3199  Assert(txn || change);
3200 
3201  /*
3202  * Ignore tuple CID changes, because those are not evicted when reaching
3203  * memory limit. So we just don't count them, because it might easily
3204  * trigger a pointless attempt to spill.
3205  */
3206  if (change && change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
3207  return;
3208 
3209  if (sz == 0)
3210  return;
3211 
3212  if (txn == NULL)
3213  txn = change->txn;
3214  Assert(txn != NULL);
3215 
3216  /*
3217  * Update the total size in top level as well. This is later used to
3218  * compute the decoding stats.
3219  */
3220  toptxn = rbtxn_get_toptxn(txn);
3221 
3222  if (addition)
3223  {
3224  Size oldsize = txn->size;
3225 
3226  txn->size += sz;
3227  rb->size += sz;
3228 
3229  /* Update the total size in the top transaction. */
3230  toptxn->total_size += sz;
3231 
3232  /* Update the max-heap */
3233  if (oldsize != 0)
3234  pairingheap_remove(rb->txn_heap, &txn->txn_node);
3235  pairingheap_add(rb->txn_heap, &txn->txn_node);
3236  }
3237  else
3238  {
3239  Assert((rb->size >= sz) && (txn->size >= sz));
3240  txn->size -= sz;
3241  rb->size -= sz;
3242 
3243  /* Update the total size in the top transaction. */
3244  toptxn->total_size -= sz;
3245 
3246  /* Update the max-heap */
3247  pairingheap_remove(rb->txn_heap, &txn->txn_node);
3248  if (txn->size != 0)
3249  pairingheap_add(rb->txn_heap, &txn->txn_node);
3250  }
3251 
3252  Assert(txn->size <= rb->size);
3253 }
size_t Size
Definition: c.h:605
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:170
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:112
pairingheap_node txn_node

References ReorderBufferChange::action, Assert, pairingheap_add(), pairingheap_remove(), rbtxn_get_toptxn, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferTXN::size, ReorderBuffer::size, ReorderBufferTXN::total_size, ReorderBufferChange::txn, ReorderBuffer::txn_heap, and ReorderBufferTXN::txn_node.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferSerializeTXN(), ReorderBufferToastReplace(), and ReorderBufferTruncateTXN().

◆ ReorderBufferChangeSize()

static Size ReorderBufferChangeSize ( ReorderBufferChange change)
static

Definition at line 4143 of file reorderbuffer.c.

4144 {
4145  Size sz = sizeof(ReorderBufferChange);
4146 
4147  switch (change->action)
4148  {
4149  /* fall through these, they're all similar enough */
4154  {
4155  HeapTuple oldtup,
4156  newtup;
4157  Size oldlen = 0;
4158  Size newlen = 0;
4159 
4160  oldtup = change->data.tp.oldtuple;
4161  newtup = change->data.tp.newtuple;
4162 
4163  if (oldtup)
4164  {
4165  sz += sizeof(HeapTupleData);
4166  oldlen = oldtup->t_len;
4167  sz += oldlen;
4168  }
4169 
4170  if (newtup)
4171  {
4172  sz += sizeof(HeapTupleData);
4173  newlen = newtup->t_len;
4174  sz += newlen;
4175  }
4176 
4177  break;
4178  }
4180  {
4181  Size prefix_size = strlen(change->data.msg.prefix) + 1;
4182 
4183  sz += prefix_size + change->data.msg.message_size +
4184  sizeof(Size) + sizeof(Size);
4185 
4186  break;
4187  }
4189  {
4190  sz += sizeof(SharedInvalidationMessage) *
4191  change->data.inval.ninvalidations;
4192  break;
4193  }
4195  {
4196  Snapshot snap;
4197 
4198  snap = change->data.snapshot;
4199 
4200  sz += sizeof(SnapshotData) +
4201  sizeof(TransactionId) * snap->xcnt +
4202  sizeof(TransactionId) * snap->subxcnt;
4203 
4204  break;
4205  }
4207  {
4208  sz += sizeof(Oid) * change->data.truncate.nrelids;
4209 
4210  break;
4211  }
4216  /* ReorderBufferChange contains everything important */
4217  break;
4218  }
4219 
4220  return sz;
4221 }
struct HeapTupleData HeapTupleData
unsigned int Oid
Definition: postgres_ext.h:31
struct ReorderBufferChange ReorderBufferChange
@ REORDER_BUFFER_CHANGE_MESSAGE
Definition: reorderbuffer.h:50
@ REORDER_BUFFER_CHANGE_TRUNCATE
Definition: reorderbuffer.h:58
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:49
struct SnapshotData SnapshotData
uint32 t_len
Definition: htup.h:64
struct ReorderBufferChange::@105::@107 truncate
struct ReorderBufferChange::@105::@106 tp
int32 subxcnt
Definition: snapshot.h:181
uint32 xcnt
Definition: snapshot.h:169

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::msg, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::snapshot, SnapshotData::subxcnt, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, and SnapshotData::xcnt.

Referenced by ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferReturnChange(), and ReorderBufferToastReplace().

◆ ReorderBufferCheckMemoryLimit()

static void ReorderBufferCheckMemoryLimit ( ReorderBuffer rb)
static

Definition at line 3608 of file reorderbuffer.c.

3609 {
3610  ReorderBufferTXN *txn;
3611 
3612  /*
3613  * Bail out if debug_logical_replication_streaming is buffered and we
3614  * haven't exceeded the memory limit.
3615  */
3617  rb->size < logical_decoding_work_mem * 1024L)
3618  return;
3619 
3620  /*
3621  * If debug_logical_replication_streaming is immediate, loop until there's
3622  * no change. Otherwise, loop until we reach under the memory limit. One
3623  * might think that just by evicting the largest (sub)transaction we will
3624  * come under the memory limit based on assumption that the selected
3625  * transaction is at least as large as the most recent change (which
3626  * caused us to go over the memory limit). However, that is not true
3627  * because a user can reduce the logical_decoding_work_mem to a smaller
3628  * value before the most recent change.
3629  */
3630  while (rb->size >= logical_decoding_work_mem * 1024L ||
3632  rb->size > 0))
3633  {
3634  /*
3635  * Pick the largest transaction and evict it from memory by streaming,
3636  * if possible. Otherwise, spill to disk.
3637  */
3639  (txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
3640  {
3641  /* we know there has to be one, because the size is not zero */
3642  Assert(txn && rbtxn_is_toptxn(txn));
3643  Assert(txn->total_size > 0);
3644  Assert(rb->size >= txn->total_size);
3645 
3646  ReorderBufferStreamTXN(rb, txn);
3647  }
3648  else
3649  {
3650  /*
3651  * Pick the largest transaction (or subtransaction) and evict it
3652  * from memory by serializing it to disk.
3653  */
3654  txn = ReorderBufferLargestTXN(rb);
3655 
3656  /* we know there has to be one, because the size is not zero */
3657  Assert(txn);
3658  Assert(txn->size > 0);
3659  Assert(rb->size >= txn->size);
3660 
3661  ReorderBufferSerializeTXN(rb, txn);
3662  }
3663 
3664  /*
3665  * After eviction, the transaction should have no entries in memory,
3666  * and should use 0 bytes for changes.
3667  */
3668  Assert(txn->size == 0);
3669  Assert(txn->nentries_mem == 0);
3670  }
3671 
3672  /* We must be under the memory limit now. */
3673  Assert(rb->size < logical_decoding_work_mem * 1024L);
3674 
3675 }
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)
static ReorderBufferTXN * ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
int logical_decoding_work_mem
int debug_logical_replication_streaming
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
Definition: reorderbuffer.h:29
@ DEBUG_LOGICAL_REP_STREAMING_BUFFERED
Definition: reorderbuffer.h:28
#define rbtxn_is_toptxn(txn)

References Assert, DEBUG_LOGICAL_REP_STREAMING_BUFFERED, DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE, debug_logical_replication_streaming, logical_decoding_work_mem, ReorderBufferTXN::nentries_mem, rbtxn_is_toptxn, ReorderBufferCanStartStreaming(), ReorderBufferLargestStreamableTopTXN(), ReorderBufferLargestTXN(), ReorderBufferSerializeTXN(), ReorderBufferStreamTXN(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBufferTXN::total_size.

Referenced by ReorderBufferQueueChange().

◆ ReorderBufferCleanupSerializedTXNs()

static void ReorderBufferCleanupSerializedTXNs ( const char *  slotname)
static

Definition at line 4569 of file reorderbuffer.c.

4570 {
4571  DIR *spill_dir;
4572  struct dirent *spill_de;
4573  struct stat statbuf;
4574  char path[MAXPGPATH * 2 + 12];
4575 
4576  sprintf(path, "pg_replslot/%s", slotname);
4577 
4578  /* we're only handling directories here, skip if it's not ours */
4579  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
4580  return;
4581 
4582  spill_dir = AllocateDir(path);
4583  while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
4584  {
4585  /* only look at names that can be ours */
4586  if (strncmp(spill_de->d_name, "xid", 3) == 0)
4587  {
4588  snprintf(path, sizeof(path),
4589  "pg_replslot/%s/%s", slotname,
4590  spill_de->d_name);
4591 
4592  if (unlink(path) != 0)
4593  ereport(ERROR,
4595  errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
4596  path, slotname)));
4597  }
4598  }
4599  FreeDir(spill_dir);
4600 }
#define INFO
Definition: elog.h:34
int FreeDir(DIR *dir)
Definition: fd.c:2961
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2924
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2843
#define snprintf
Definition: port.h:238
Definition: dirent.c:26
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15
#define lstat(path, sb)
Definition: win32_port.h:285
#define S_ISDIR(m)
Definition: win32_port.h:325

References AllocateDir(), dirent::d_name, ereport, errcode_for_file_access(), errmsg(), ERROR, FreeDir(), INFO, lstat, MAXPGPATH, ReadDirExtended(), S_ISDIR, snprintf, sprintf, and stat::st_mode.

Referenced by ReorderBufferAllocate(), ReorderBufferFree(), and StartupReorderBuffer().

◆ ReorderBufferCleanupTXN()

static void ReorderBufferCleanupTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1505 of file reorderbuffer.c.

1506 {
1507  bool found;
1508  dlist_mutable_iter iter;
1509 
1510  /* cleanup subtransactions & their changes */
1511  dlist_foreach_modify(iter, &txn->subtxns)
1512  {
1513  ReorderBufferTXN *subtxn;
1514 
1515  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1516 
1517  /*
1518  * Subtransactions are always associated to the toplevel TXN, even if
1519  * they originally were happening inside another subtxn, so we won't
1520  * ever recurse more than one level deep here.
1521  */
1522  Assert(rbtxn_is_known_subxact(subtxn));
1523  Assert(subtxn->nsubtxns == 0);
1524 
1525  ReorderBufferCleanupTXN(rb, subtxn);
1526  }
1527 
1528  /* cleanup changes in the txn */
1529  dlist_foreach_modify(iter, &txn->changes)
1530  {
1531  ReorderBufferChange *change;
1532 
1533  change = dlist_container(ReorderBufferChange, node, iter.cur);
1534 
1535  /* Check we're not mixing changes from different transactions. */
1536  Assert(change->txn == txn);
1537 
1538  ReorderBufferReturnChange(rb, change, false);
1539  }
1540 
1541  /*
1542  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1543  * They are always stored in the toplevel transaction.
1544  */
1545  dlist_foreach_modify(iter, &txn->tuplecids)
1546  {
1547  ReorderBufferChange *change;
1548 
1549  change = dlist_container(ReorderBufferChange, node, iter.cur);
1550 
1551  /* Check we're not mixing changes from different transactions. */
1552  Assert(change->txn == txn);
1554 
1555  ReorderBufferReturnChange(rb, change, true);
1556  }
1557 
1558  /*
1559  * Cleanup the base snapshot, if set.
1560  */
1561  if (txn->base_snapshot != NULL)
1562  {
1565  }
1566 
1567  /*
1568  * Cleanup the snapshot for the last streamed run.
1569  */
1570  if (txn->snapshot_now != NULL)
1571  {
1572  Assert(rbtxn_is_streamed(txn));
1574  }
1575 
1576  /*
1577  * Remove TXN from its containing lists.
1578  *
1579  * Note: if txn is known as subxact, we are deleting the TXN from its
1580  * parent's list of known subxacts; this leaves the parent's nsubxacts
1581  * count too high, but we don't care. Otherwise, we are deleting the TXN
1582  * from the LSN-ordered list of toplevel TXNs. We remove the TXN from the
1583  * list of catalog modifying transactions as well.
1584  */
1585  dlist_delete(&txn->node);
1586  if (rbtxn_has_catalog_changes(txn))
1588 
1589  /* now remove reference from buffer */
1590  hash_search(rb->by_txn, &txn->xid, HASH_REMOVE, &found);
1591  Assert(found);
1592 
1593  /* remove entries spilled to disk */
1594  if (rbtxn_is_serialized(txn))
1595  ReorderBufferRestoreCleanup(rb, txn);
1596 
1597  /* Update the memory counter */
1598  ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
1599 
1600  /* deallocate */
1601  ReorderBufferReturnTXN(rb, txn);
1602 }
@ HASH_REMOVE
Definition: hsearch.h:115
static void dclist_delete_from(dclist_head *head, dlist_node *node)
Definition: ilist.h:763
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, ReorderBufferTXN *txn, bool addition, Size sz)
#define rbtxn_is_serialized(txn)
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:457
Snapshot snapshot_now
dlist_node catchange_node
dlist_node base_snapshot_node

References ReorderBufferChange::action, Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_node, ReorderBuffer::by_txn, ReorderBufferTXN::catchange_node, ReorderBuffer::catchange_txns, ReorderBufferTXN::changes, dlist_mutable_iter::cur, dclist_delete_from(), dlist_container, dlist_delete(), dlist_foreach_modify, HASH_REMOVE, hash_search(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_has_catalog_changes, rbtxn_is_known_subxact, rbtxn_is_serialized, rbtxn_is_streamed, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferChangeMemoryUpdate(), ReorderBufferFreeSnap(), ReorderBufferRestoreCleanup(), ReorderBufferReturnChange(), ReorderBufferReturnTXN(), ReorderBufferTXN::size, SnapBuildSnapDecRefcount(), ReorderBufferTXN::snapshot_now, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAbortOld(), ReorderBufferFinishPrepared(), ReorderBufferForget(), ReorderBufferProcessTXN(), ReorderBufferReplay(), and ReorderBufferStreamCommit().

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2711 of file reorderbuffer.c.

2715 {
2716  ReorderBufferTXN *txn;
2717 
2718  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2719  false);
2720 
2721  /* unknown transaction, nothing to replay */
2722  if (txn == NULL)
2723  return;
2724 
2725  ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2726  origin_id, origin_lsn);
2727 }
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)

References InvalidXLogRecPtr, ReorderBufferReplay(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 1189 of file reorderbuffer.c.

1192 {
1193  ReorderBufferTXN *subtxn;
1194 
1195  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1196  InvalidXLogRecPtr, false);
1197 
1198  /*
1199  * No need to do anything if that subtxn didn't contain any changes
1200  */
1201  if (!subtxn)
1202  return;
1203 
1204  subtxn->final_lsn = commit_lsn;
1205  subtxn->end_lsn = end_lsn;
1206 
1207  /*
1208  * Assign this subxact as a child of the toplevel xact (no-op if already
1209  * done.)
1210  */
1211  ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1212 }
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit(), and DecodePrepare().

◆ ReorderBufferCopySnap()

static Snapshot ReorderBufferCopySnap ( ReorderBuffer rb,
Snapshot  orig_snap,
ReorderBufferTXN txn,
CommandId  cid 
)
static

Definition at line 1807 of file reorderbuffer.c.

1809 {
1810  Snapshot snap;
1811  dlist_iter iter;
1812  int i = 0;
1813  Size size;
1814 
1815  size = sizeof(SnapshotData) +
1816  sizeof(TransactionId) * orig_snap->xcnt +
1817  sizeof(TransactionId) * (txn->nsubtxns + 1);
1818 
1819  snap = MemoryContextAllocZero(rb->context, size);
1820  memcpy(snap, orig_snap, sizeof(SnapshotData));
1821 
1822  snap->copied = true;
1823  snap->active_count = 1; /* mark as active so nobody frees it */
1824  snap->regd_count = 0;
1825  snap->xip = (TransactionId *) (snap + 1);
1826 
1827  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1828 
1829  /*
1830  * snap->subxip contains all txids that belong to our transaction which we
1831  * need to check via cmin/cmax. That's why we store the toplevel
1832  * transaction in there as well.
1833  */
1834  snap->subxip = snap->xip + snap->xcnt;
1835  snap->subxip[i++] = txn->xid;
1836 
1837  /*
1838  * subxcnt isn't decreased when subtransactions abort, so count manually.
1839  * Since it's an upper boundary it is safe to use it for the allocation
1840  * above.
1841  */
1842  snap->subxcnt = 1;
1843 
1844  dlist_foreach(iter, &txn->subtxns)
1845  {
1846  ReorderBufferTXN *sub_txn;
1847 
1848  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1849  snap->subxip[i++] = sub_txn->xid;
1850  snap->subxcnt++;
1851  }
1852 
1853  /* sort so we can bsearch() later */
1854  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1855 
1856  /* store the specified current CommandId */
1857  snap->curcid = cid;
1858 
1859  return snap;
1860 }
int i
Definition: isn.c:73
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1214
#define qsort(a, b, c, d)
Definition: port.h:449
static pg_noinline void Size size
Definition: slab.c:607
bool copied
Definition: snapshot.h:185
uint32 regd_count
Definition: snapshot.h:205
uint32 active_count
Definition: snapshot.h:204
CommandId curcid
Definition: snapshot.h:187
TransactionId * subxip
Definition: snapshot.h:180
TransactionId * xip
Definition: snapshot.h:168
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:139

References SnapshotData::active_count, ReorderBuffer::context, SnapshotData::copied, dlist_iter::cur, SnapshotData::curcid, dlist_container, dlist_foreach, i, MemoryContextAllocZero(), ReorderBufferTXN::nsubtxns, qsort, SnapshotData::regd_count, size, ReorderBufferTXN::subtxns, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::xcnt, ReorderBufferTXN::xid, xidComparator(), and SnapshotData::xip.

Referenced by ReorderBufferProcessTXN(), ReorderBufferSaveTXNSnapshot(), and ReorderBufferStreamTXN().

◆ ReorderBufferExecuteInvalidations()

static void ReorderBufferExecuteInvalidations ( uint32  nmsgs,
SharedInvalidationMessage msgs 
)
static

Definition at line 3359 of file reorderbuffer.c.

3360 {
3361  int i;
3362 
3363  for (i = 0; i < nmsgs; i++)
3365 }
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:705

References i, and LocalExecuteInvalidationMessage().

Referenced by ReorderBufferFinishPrepared(), and ReorderBufferProcessTXN().

◆ ReorderBufferFinishPrepared()

void ReorderBufferFinishPrepared ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
XLogRecPtr  two_phase_at,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn,
char *  gid,
bool  is_commit 
)

Definition at line 2817 of file reorderbuffer.c.

2822 {
2823  ReorderBufferTXN *txn;
2824  XLogRecPtr prepare_end_lsn;
2825  TimestampTz prepare_time;
2826 
2827  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
2828 
2829  /* unknown transaction, nothing to do */
2830  if (txn == NULL)
2831  return;
2832 
2833  /*
2834  * By this time the txn has the prepare record information, remember it to
2835  * be later used for rollback.
2836  */
2837  prepare_end_lsn = txn->end_lsn;
2838  prepare_time = txn->xact_time.prepare_time;
2839 
2840  /* add the gid in the txn */
2841  txn->gid = pstrdup(gid);
2842 
2843  /*
2844  * It is possible that this transaction is not decoded at prepare time
2845  * either because by that time we didn't have a consistent snapshot, or
2846  * two_phase was not enabled, or it was decoded earlier but we have
2847  * restarted. We only need to send the prepare if it was not decoded
2848  * earlier. We don't need to decode the xact for aborts if it is not done
2849  * already.
2850  */
2851  if ((txn->final_lsn < two_phase_at) && is_commit)
2852  {
2853  txn->txn_flags |= RBTXN_PREPARE;
2854 
2855  /*
2856  * The prepare info must have been updated in txn even if we skip
2857  * prepare.
2858  */
2860 
2861  /*
2862  * By this time the txn has the prepare record information and it is
2863  * important to use that so that downstream gets the accurate
2864  * information. If instead, we have passed commit information here
2865  * then downstream can behave as it has already replayed commit
2866  * prepared after the restart.
2867  */
2868  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2869  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2870  }
2871 
2872  txn->final_lsn = commit_lsn;
2873  txn->end_lsn = end_lsn;
2874  txn->xact_time.commit_time = commit_time;
2875  txn->origin_id = origin_id;
2876  txn->origin_lsn = origin_lsn;
2877 
2878  if (is_commit)
2879  rb->commit_prepared(rb, txn, commit_lsn);
2880  else
2881  rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
2882 
2883  /* cleanup: make sure there's no cache pollution */
2885  txn->invalidations);
2886  ReorderBufferCleanupTXN(rb, txn);
2887 }
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1695
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
#define RBTXN_PREPARE
TimestampTz commit_time
RepOriginId origin_id
XLogRecPtr origin_lsn
TimestampTz prepare_time
ReorderBufferCommitPreparedCB commit_prepared
ReorderBufferRollbackPreparedCB rollback_prepared

References Assert, ReorderBuffer::commit_prepared, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, pstrdup(), RBTXN_PREPARE, ReorderBufferCleanupTXN(), ReorderBufferExecuteInvalidations(), ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBuffer::rollback_prepared, ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodeAbort(), and DecodeCommit().

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2994 of file reorderbuffer.c.

2995 {
2996  ReorderBufferTXN *txn;
2997 
2998  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2999  false);
3000 
3001  /* unknown, nothing to forget */
3002  if (txn == NULL)
3003  return;
3004 
3005  /* this transaction mustn't be streamed */
3006  Assert(!rbtxn_is_streamed(txn));
3007 
3008  /* cosmetic... */
3009  txn->final_lsn = lsn;
3010 
3011  /*
3012  * Process cache invalidation messages if there are any. Even if we're not
3013  * interested in the transaction's contents, it could have manipulated the
3014  * catalog and we need to update the caches according to that.
3015  */
3016  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3018  txn->invalidations);
3019  else
3020  Assert(txn->ninvalidations == 0);
3021 
3022  /* remove potential on-disk data, and deallocate */
3023  ReorderBufferCleanupTXN(rb, txn);
3024 }

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer rb)

Definition at line 396 of file reorderbuffer.c.

397 {
399 
400  /*
401  * We free separately allocated data by entirely scrapping reorderbuffer's
402  * memory context.
403  */
405 
406  /* Free disk space used by unconsumed reorder buffers */
408 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
tree context
Definition: radixtree.h:1833

References context, ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

◆ ReorderBufferFreeSnap()

static void ReorderBufferFreeSnap ( ReorderBuffer rb,
Snapshot  snap 
)
static

Definition at line 1866 of file reorderbuffer.c.

1867 {
1868  if (snap->copied)
1869  pfree(snap);
1870  else
1872 }
void pfree(void *pointer)
Definition: mcxt.c:1520

References SnapshotData::copied, pfree(), and SnapBuildSnapDecRefcount().

Referenced by ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), ReorderBufferReturnChange(), and ReorderBufferStreamTXN().

◆ ReorderBufferGetCatalogChangesXacts()

TransactionId* ReorderBufferGetCatalogChangesXacts ( ReorderBuffer rb)

Definition at line 3409 of file reorderbuffer.c.

3410 {
3411  dlist_iter iter;
3412  TransactionId *xids = NULL;
3413  size_t xcnt = 0;
3414 
3415  /* Quick return if the list is empty */
3416  if (dclist_count(&rb->catchange_txns) == 0)
3417  return NULL;
3418 
3419  /* Initialize XID array */
3420  xids = (TransactionId *) palloc(sizeof(TransactionId) *
3422  dclist_foreach(iter, &rb->catchange_txns)
3423  {
3425  catchange_node,
3426  iter.cur);
3427 
3429 
3430  xids[xcnt++] = txn->xid;
3431  }
3432 
3433  qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
3434 
3435  Assert(xcnt == dclist_count(&rb->catchange_txns));
3436  return xids;
3437 }
#define dclist_container(type, membername, ptr)
Definition: ilist.h:947
static uint32 dclist_count(const dclist_head *head)
Definition: ilist.h:932
#define dclist_foreach(iter, lhead)
Definition: ilist.h:970

References Assert, ReorderBuffer::catchange_txns, dlist_iter::cur, dclist_container, dclist_count(), dclist_foreach, palloc(), qsort, rbtxn_has_catalog_changes, ReorderBufferTXN::xid, and xidComparator().

Referenced by SnapBuildSerialize().

◆ ReorderBufferGetChange()

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer rb)

Definition at line 1014 of file reorderbuffer.c.

1015 {
1016  ReorderBufferTXN *txn;
1017 
1018  AssertTXNLsnOrder(rb);
1019 
1020  if (dlist_is_empty(&rb->toplevel_by_lsn))
1021  return NULL;
1022 
1024 
1027  return txn;
1028 }
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:603

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, and ReorderBuffer::toplevel_by_lsn.

Referenced by SnapBuildProcessRunningXacts().

◆ ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBuffer rb)

Definition at line 1042 of file reorderbuffer.c.

1043 {
1044  ReorderBufferTXN *txn;
1045 
1046  AssertTXNLsnOrder(rb);
1047 
1049  return InvalidTransactionId;
1050 
1051  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
1053  return txn->base_snapshot->xmin;
1054 }
TransactionId xmin
Definition: snapshot.h:157

References AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, dlist_head_element, dlist_is_empty(), InvalidTransactionId, ReorderBuffer::txns_by_base_snapshot_lsn, and SnapshotData::xmin.

Referenced by SnapBuildProcessRunningXacts().

◆ ReorderBufferGetRelids()

Oid* ReorderBufferGetRelids ( ReorderBuffer rb,
int  nrelids 
)

Definition at line 595 of file reorderbuffer.c.

596 {
597  Oid *relids;
598  Size alloc_len;
599 
600  alloc_len = sizeof(Oid) * nrelids;
601 
602  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
603 
604  return relids;
605 }

References ReorderBuffer::context, and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

◆ ReorderBufferGetTupleBuf()

HeapTuple ReorderBufferGetTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)

Definition at line 562 of file reorderbuffer.c.

563 {
564  HeapTuple tuple;
565  Size alloc_len;
566 
567  alloc_len = tuple_len + SizeofHeapTupleHeader;
568 
570  HEAPTUPLESIZE + alloc_len);
571  tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
572 
573  return tuple;
574 }
#define HEAPTUPLESIZE
Definition: htup.h:73
HeapTupleData * HeapTuple
Definition: htup.h:71
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
#define SizeofHeapTupleHeader
Definition: htup_details.h:185
HeapTupleHeader t_data
Definition: htup.h:68

References HEAPTUPLESIZE, MemoryContextAlloc(), SizeofHeapTupleHeader, HeapTupleData::t_data, and ReorderBuffer::tup_context.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

◆ ReorderBufferGetTXN()

static ReorderBufferTXN * ReorderBufferGetTXN ( ReorderBuffer rb)
static

Definition at line 414 of file reorderbuffer.c.

415 {
416  ReorderBufferTXN *txn;
417 
418  txn = (ReorderBufferTXN *)
420 
421  memset(txn, 0, sizeof(ReorderBufferTXN));
422 
423  dlist_init(&txn->changes);
424  dlist_init(&txn->tuplecids);
425  dlist_init(&txn->subtxns);
426 
427  /* InvalidCommandId is not zero, so set it explicitly */
429  txn->output_plugin_private = NULL;
430 
431  return txn;
432 }
CommandId command_id
void * output_plugin_private

References ReorderBufferTXN::changes, ReorderBufferTXN::command_id, dlist_init(), InvalidCommandId, MemoryContextAlloc(), ReorderBufferTXN::output_plugin_private, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, and ReorderBuffer::txn_context.

Referenced by ReorderBufferTXNByXid().

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 3066 of file reorderbuffer.c.

3068 {
3069  bool use_subtxn = IsTransactionOrTransactionBlock();
3070  int i;
3071 
3072  if (use_subtxn)
3073  BeginInternalSubTransaction("replay");
3074 
3075  /*
3076  * Force invalidations to happen outside of a valid transaction - that way
3077  * entries will just be marked as invalid without accessing the catalog.
3078  * That's advantageous because we don't need to setup the full state
3079  * necessary for catalog access.
3080  */
3081  if (use_subtxn)
3083 
3084  for (i = 0; i < ninvalidations; i++)
3085  LocalExecuteInvalidationMessage(&invalidations[i]);
3086 
3087  if (use_subtxn)
3089 }
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4933
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4643
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4745
void AbortCurrentTransaction(void)
Definition: xact.c:3387

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by ReorderBufferAbort(), ReorderBufferForget(), ReorderBufferInvalidate(), and xact_decode().

◆ ReorderBufferInvalidate()

void ReorderBufferInvalidate ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3035 of file reorderbuffer.c.

3036 {
3037  ReorderBufferTXN *txn;
3038 
3039  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3040  false);
3041 
3042  /* unknown, nothing to do */
3043  if (txn == NULL)
3044  return;
3045 
3046  /*
3047  * Process cache invalidation messages if there are any. Even if we're not
3048  * interested in the transaction's contents, it could have manipulated the
3049  * catalog and we need to update the caches according to that.
3050  */
3051  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3053  txn->invalidations);
3054  else
3055  Assert(txn->ninvalidations == 0);
3056 }

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodePrepare().

◆ ReorderBufferIterCompare()

static int ReorderBufferIterCompare ( Datum  a,
Datum  b,
void *  arg 
)
static

Definition at line 1231 of file reorderbuffer.c.

1232 {
1234  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
1235  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
1236 
1237  if (pos_a < pos_b)
1238  return 1;
1239  else if (pos_a == pos_b)
1240  return 0;
1241  return -1;
1242 }
void * arg
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:202
Definition: regguts.h:323

References a, arg, b, and DatumGetInt32().

Referenced by ReorderBufferIterTXNInit().

◆ ReorderBufferIterTXNFinish()

static void ReorderBufferIterTXNFinish ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1474 of file reorderbuffer.c.

1476 {
1477  int32 off;
1478 
1479  for (off = 0; off < state->nr_txns; off++)
1480  {
1481  if (state->entries[off].file.vfd != -1)
1482  FileClose(state->entries[off].file.vfd);
1483  }
1484 
1485  /* free memory we might have "leaked" in the last *Next call */
1486  if (!dlist_is_empty(&state->old_change))
1487  {
1488  ReorderBufferChange *change;
1489 
1490  change = dlist_container(ReorderBufferChange, node,
1491  dlist_pop_head_node(&state->old_change));
1492  ReorderBufferReturnChange(rb, change, true);
1493  Assert(dlist_is_empty(&state->old_change));
1494  }
1495 
1496  binaryheap_free(state->heap);
1497  pfree(state);
1498 }
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:75
void FileClose(File file)
Definition: fd.c:1978
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:450

References Assert, binaryheap_free(), dlist_container, dlist_is_empty(), dlist_pop_head_node(), FileClose(), pfree(), and ReorderBufferReturnChange().

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferIterTXNInit()

static void ReorderBufferIterTXNInit ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferIterTXNState *volatile *  iter_state 
)
static

Definition at line 1254 of file reorderbuffer.c.

1256 {
1257  Size nr_txns = 0;
1259  dlist_iter cur_txn_i;
1260  int32 off;
1261 
1262  *iter_state = NULL;
1263 
1264  /* Check ordering of changes in the toplevel transaction. */
1265  AssertChangeLsnOrder(txn);
1266 
1267  /*
1268  * Calculate the size of our heap: one element for every transaction that
1269  * contains changes. (Besides the transactions already in the reorder
1270  * buffer, we count the one we were directly passed.)
1271  */
1272  if (txn->nentries > 0)
1273  nr_txns++;
1274 
1275  dlist_foreach(cur_txn_i, &txn->subtxns)
1276  {
1277  ReorderBufferTXN *cur_txn;
1278 
1279  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1280 
1281  /* Check ordering of changes in this subtransaction. */
1282  AssertChangeLsnOrder(cur_txn);
1283 
1284  if (cur_txn->nentries > 0)
1285  nr_txns++;
1286  }
1287 
1288  /* allocate iteration state */
1291  sizeof(ReorderBufferIterTXNState) +
1292  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1293 
1294  state->nr_txns = nr_txns;
1295  dlist_init(&state->old_change);
1296 
1297  for (off = 0; off < state->nr_txns; off++)
1298  {
1299  state->entries[off].file.vfd = -1;
1300  state->entries[off].segno = 0;
1301  }
1302 
1303  /* allocate heap */
1304  state->heap = binaryheap_allocate(state->nr_txns,
1306  state);
1307 
1308  /* Now that the state fields are initialized, it is safe to return it. */
1309  *iter_state = state;
1310 
1311  /*
1312  * Now insert items into the binary heap, in an unordered fashion. (We
1313  * will run a heap assembly step at the end; this is more efficient.)
1314  */
1315 
1316  off = 0;
1317 
1318  /* add toplevel transaction if it contains changes */
1319  if (txn->nentries > 0)
1320  {
1321  ReorderBufferChange *cur_change;
1322 
1323  if (rbtxn_is_serialized(txn))
1324  {
1325  /* serialize remaining changes */
1326  ReorderBufferSerializeTXN(rb, txn);
1327  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1328  &state->entries[off].segno);
1329  }
1330 
1331  cur_change = dlist_head_element(ReorderBufferChange, node,
1332  &txn->changes);
1333 
1334  state->entries[off].lsn = cur_change->lsn;
1335  state->entries[off].change = cur_change;
1336  state->entries[off].txn = txn;
1337 
1339  }
1340 
1341  /* add subtransactions if they contain changes */
1342  dlist_foreach(cur_txn_i, &txn->subtxns)
1343  {
1344  ReorderBufferTXN *cur_txn;
1345 
1346  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1347 
1348  if (cur_txn->nentries > 0)
1349  {
1350  ReorderBufferChange *cur_change;
1351 
1352  if (rbtxn_is_serialized(cur_txn))
1353  {
1354  /* serialize remaining changes */
1355  ReorderBufferSerializeTXN(rb, cur_txn);
1356  ReorderBufferRestoreChanges(rb, cur_txn,
1357  &state->entries[off].file,
1358  &state->entries[off].segno);
1359  }
1360  cur_change = dlist_head_element(ReorderBufferChange, node,
1361  &cur_txn->changes);
1362 
1363  state->entries[off].lsn = cur_change->lsn;
1364  state->entries[off].change = cur_change;
1365  state->entries[off].txn = cur_txn;
1366 
1368  }
1369  }
1370 
1371  /* assemble a valid binary heap */
1372  binaryheap_build(state->heap);
1373 }
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:138
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:39
void binaryheap_add_unordered(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:116
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
static void AssertChangeLsnOrder(ReorderBufferTXN *txn)
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)

References AssertChangeLsnOrder(), binaryheap_add_unordered(), binaryheap_allocate(), binaryheap_build(), ReorderBufferTXN::changes, ReorderBuffer::context, dlist_iter::cur, dlist_container, dlist_foreach, dlist_head_element, dlist_init(), Int32GetDatum(), ReorderBufferChange::lsn, MemoryContextAllocZero(), ReorderBufferTXN::nentries, rbtxn_is_serialized, ReorderBufferIterCompare(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), and ReorderBufferTXN::subtxns.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferIterTXNNext()

static ReorderBufferChange * ReorderBufferIterTXNNext ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1382 of file reorderbuffer.c.

1383 {
1384  ReorderBufferChange *change;
1386  int32 off;
1387 
1388  /* nothing there anymore */
1389  if (state->heap->bh_size == 0)
1390  return NULL;
1391 
1392  off = DatumGetInt32(binaryheap_first(state->heap));
1393  entry = &state->entries[off];
1394 
1395  /* free memory we might have "leaked" in the previous *Next call */
1396  if (!dlist_is_empty(&state->old_change))
1397  {
1398  change = dlist_container(ReorderBufferChange, node,
1399  dlist_pop_head_node(&state->old_change));
1400  ReorderBufferReturnChange(rb, change, true);
1401  Assert(dlist_is_empty(&state->old_change));
1402  }
1403 
1404  change = entry->change;
1405 
1406  /*
1407  * update heap with information about which transaction has the next
1408  * relevant change in LSN order
1409  */
1410 
1411  /* there are in-memory changes */
1412  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1413  {
1414  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1415  ReorderBufferChange *next_change =
1417 
1418  /* txn stays the same */
1419  state->entries[off].lsn = next_change->lsn;
1420  state->entries[off].change = next_change;
1421 
1423  return change;
1424  }
1425 
1426  /* try to load changes from disk */
1427  if (entry->txn->nentries != entry->txn->nentries_mem)
1428  {
1429  /*
1430  * Ugly: restoring changes will reuse *Change records, thus delete the
1431  * current one from the per-tx list and only free in the next call.
1432  */
1433  dlist_delete(&change->node);
1434  dlist_push_tail(&state->old_change, &change->node);
1435 
1436  /*
1437  * Update the total bytes processed by the txn for which we are
1438  * releasing the current set of changes and restoring the new set of
1439  * changes.
1440  */
1441  rb->totalBytes += entry->txn->size;
1442  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1443  &state->entries[off].segno))
1444  {
1445  /* successfully restored changes from disk */
1446  ReorderBufferChange *next_change =
1448  &entry->txn->changes);
1449 
1450  elog(DEBUG2, "restored %u/%u changes from disk",
1451  (uint32) entry->txn->nentries_mem,
1452  (uint32) entry->txn->nentries);
1453 
1454  Assert(entry->txn->nentries_mem);
1455  /* txn stays the same */
1456  state->entries[off].lsn = next_change->lsn;
1457  state->entries[off].change = next_change;
1459 
1460  return change;
1461  }
1462  }
1463 
1464  /* ok, no changes there anymore, remove */
1466 
1467  return change;
1468 }
void binaryheap_replace_first(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:255
bh_node_type binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:177
bh_node_type binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:192
static int32 next
Definition: blutils.c:221
unsigned int uint32
Definition: c.h:506
static bool dlist_has_next(const dlist_head *head, const dlist_node *node)
Definition: ilist.h:503
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:537
ReorderBufferChange * change
ReorderBufferTXN * txn

References Assert, binaryheap_first(), binaryheap_remove_first(), binaryheap_replace_first(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, DatumGetInt32(), DEBUG2, dlist_container, dlist_delete(), dlist_has_next(), dlist_head_element, dlist_is_empty(), dlist_next_node(), dlist_pop_head_node(), dlist_push_tail(), elog, ReorderBufferIterTXNEntry::file, Int32GetDatum(), ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, next, ReorderBufferChange::node, ReorderBufferRestoreChanges(), ReorderBufferReturnChange(), ReorderBufferTXN::size, ReorderBuffer::totalBytes, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferLargestStreamableTopTXN()

static ReorderBufferTXN* ReorderBufferLargestStreamableTopTXN ( ReorderBuffer rb)
static

Definition at line 3564 of file reorderbuffer.c.

3565 {
3566  dlist_iter iter;
3567  Size largest_size = 0;
3568  ReorderBufferTXN *largest = NULL;
3569 
3570  /* Find the largest top-level transaction having a base snapshot. */
3572  {
3573  ReorderBufferTXN *txn;
3574 
3575  txn = dlist_container(ReorderBufferTXN, base_snapshot_node, iter.cur);
3576 
3577  /* must not be a subtxn */
3579  /* base_snapshot must be set */
3580  Assert(txn->base_snapshot != NULL);
3581 
3582  if ((largest == NULL || txn->total_size > largest_size) &&
3583  (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) &&
3585  {
3586  largest = txn;
3587  largest_size = txn->total_size;
3588  }
3589  }
3590 
3591  return largest;
3592 }
#define rbtxn_has_streamable_change(txn)
#define rbtxn_has_partial_change(txn)

References Assert, ReorderBufferTXN::base_snapshot, dlist_iter::cur, dlist_container, dlist_foreach, rbtxn_has_partial_change, rbtxn_has_streamable_change, rbtxn_is_known_subxact, ReorderBufferTXN::total_size, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferCheckMemoryLimit().

◆ ReorderBufferLargestTXN()

static ReorderBufferTXN* ReorderBufferLargestTXN ( ReorderBuffer rb)
static

Definition at line 3524 of file reorderbuffer.c.

3525 {
3526  ReorderBufferTXN *largest;
3527 
3528  /* Get the largest transaction from the max-heap */
3529  largest = pairingheap_container(ReorderBufferTXN, txn_node,
3531 
3532  Assert(largest);
3533  Assert(largest->size > 0);
3534  Assert(largest->size <= rb->size);
3535 
3536  return largest;
3537 }
pairingheap_node * pairingheap_first(pairingheap *heap)
Definition: pairingheap.c:130
#define pairingheap_container(type, membername, ptr)
Definition: pairingheap.h:43

References Assert, pairingheap_container, pairingheap_first(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBuffer::txn_heap.

Referenced by ReorderBufferCheckMemoryLimit().

◆ ReorderBufferPrepare()

void ReorderBufferPrepare ( ReorderBuffer rb,
TransactionId  xid,
char *  gid 
)

Definition at line 2780 of file reorderbuffer.c.

2782 {
2783  ReorderBufferTXN *txn;
2784 
2785  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2786  false);
2787 
2788  /* unknown transaction, nothing to replay */
2789  if (txn == NULL)
2790  return;
2791 
2792  txn->txn_flags |= RBTXN_PREPARE;
2793  txn->gid = pstrdup(gid);
2794 
2795  /* The prepare info must have been updated in txn by now. */
2797 
2798  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2799  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2800 
2801  /*
2802  * We send the prepare for the concurrently aborted xacts so that later
2803  * when rollback prepared is decoded and sent, the downstream should be
2804  * able to rollback such a xact. See comments atop DecodePrepare.
2805  *
2806  * Note, for the concurrent_abort + streaming case a stream_prepare was
2807  * already sent within the ReorderBufferReplay call above.
2808  */
2809  if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
2810  rb->prepare(rb, txn, txn->final_lsn);
2811 }
ReorderBufferPrepareCB prepare

References Assert, ReorderBufferTXN::concurrent_abort, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBuffer::prepare, ReorderBufferTXN::prepare_time, pstrdup(), rbtxn_is_streamed, RBTXN_PREPARE, ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodePrepare().

◆ ReorderBufferProcessPartialChange()

static void ReorderBufferProcessPartialChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  toast_insert 
)
static

Definition at line 711 of file reorderbuffer.c.

714 {
715  ReorderBufferTXN *toptxn;
716 
717  /*
718  * The partial changes need to be processed only while streaming
719  * in-progress transactions.
720  */
721  if (!ReorderBufferCanStream(rb))
722  return;
723 
724  /* Get the top transaction. */
725  toptxn = rbtxn_get_toptxn(txn);
726 
727  /*
728  * Indicate a partial change for toast inserts. The change will be
729  * considered as complete once we get the insert or update on the main
730  * table and we are sure that the pending toast chunks are not required
731  * anymore.
732  *
733  * If we allow streaming when there are pending toast chunks then such
734  * chunks won't be released till the insert (multi_insert) is complete and
735  * we expect the txn to have streamed all changes after streaming. This
736  * restriction is mainly to ensure the correctness of streamed
737  * transactions and it doesn't seem worth uplifting such a restriction
738  * just to allow this case because anyway we will stream the transaction
739  * once such an insert is complete.
740  */
741  if (toast_insert)
743  else if (rbtxn_has_partial_change(toptxn) &&
744  IsInsertOrUpdate(change->action) &&
745  change->data.tp.clear_toast_afterwards)
747 
748  /*
749  * Indicate a partial change for speculative inserts. The change will be
750  * considered as complete once we get the speculative confirm or abort
751  * token.
752  */
753  if (IsSpecInsert(change->action))
755  else if (rbtxn_has_partial_change(toptxn) &&
756  IsSpecConfirmOrAbort(change->action))
758 
759  /*
760  * Stream the transaction if it is serialized before and the changes are
761  * now complete in the top-level transaction.
762  *
763  * The reason for doing the streaming of such a transaction as soon as we
764  * get the complete change for it is that previously it would have reached
765  * the memory threshold and wouldn't get streamed because of incomplete
766  * changes. Delaying such transactions would increase apply lag for them.
767  */
769  !(rbtxn_has_partial_change(toptxn)) &&
770  rbtxn_is_serialized(txn) &&
772  ReorderBufferStreamTXN(rb, toptxn);
773 }
#define IsSpecInsert(action)
#define IsInsertOrUpdate(action)
#define IsSpecConfirmOrAbort(action)
#define RBTXN_HAS_PARTIAL_CHANGE

References ReorderBufferChange::action, ReorderBufferChange::data, IsInsertOrUpdate, IsSpecConfirmOrAbort, IsSpecInsert, rbtxn_get_toptxn, RBTXN_HAS_PARTIAL_CHANGE, rbtxn_has_partial_change, rbtxn_has_streamable_change, rbtxn_is_serialized, ReorderBufferCanStartStreaming(), ReorderBufferCanStream(), ReorderBufferStreamTXN(), ReorderBufferChange::tp, and ReorderBufferTXN::txn_flags.

Referenced by ReorderBufferQueueChange().

◆ ReorderBufferProcessTXN()

static void ReorderBufferProcessTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn,
volatile Snapshot  snapshot_now,
volatile CommandId  command_id,
bool  streaming 
)
static

Definition at line 2080 of file reorderbuffer.c.

2085 {
2086  bool using_subtxn;
2088  ReorderBufferIterTXNState *volatile iterstate = NULL;
2089  volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
2090  ReorderBufferChange *volatile specinsert = NULL;
2091  volatile bool stream_started = false;
2092  ReorderBufferTXN *volatile curtxn = NULL;
2093 
2094  /* build data to be able to lookup the CommandIds of catalog tuples */
2096 
2097  /* setup the initial snapshot */
2098  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2099 
2100  /*
2101  * Decoding needs access to syscaches et al., which in turn use
2102  * heavyweight locks and such. Thus we need to have enough state around to
2103  * keep track of those. The easiest way is to simply use a transaction
2104  * internally. That also allows us to easily enforce that nothing writes
2105  * to the database by checking for xid assignments.
2106  *
2107  * When we're called via the SQL SRF there's already a transaction
2108  * started, so start an explicit subtransaction there.
2109  */
2110  using_subtxn = IsTransactionOrTransactionBlock();
2111 
2112  PG_TRY();
2113  {
2114  ReorderBufferChange *change;
2115  int changes_count = 0; /* used to accumulate the number of
2116  * changes */
2117 
2118  if (using_subtxn)
2119  BeginInternalSubTransaction(streaming ? "stream" : "replay");
2120  else
2122 
2123  /*
2124  * We only need to send begin/begin-prepare for non-streamed
2125  * transactions.
2126  */
2127  if (!streaming)
2128  {
2129  if (rbtxn_prepared(txn))
2130  rb->begin_prepare(rb, txn);
2131  else
2132  rb->begin(rb, txn);
2133  }
2134 
2135  ReorderBufferIterTXNInit(rb, txn, &iterstate);
2136  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
2137  {
2138  Relation relation = NULL;
2139  Oid reloid;
2140 
2142 
2143  /*
2144  * We can't call start stream callback before processing first
2145  * change.
2146  */
2147  if (prev_lsn == InvalidXLogRecPtr)
2148  {
2149  if (streaming)
2150  {
2151  txn->origin_id = change->origin_id;
2152  rb->stream_start(rb, txn, change->lsn);
2153  stream_started = true;
2154  }
2155  }
2156 
2157  /*
2158  * Enforce correct ordering of changes, merged from multiple
2159  * subtransactions. The changes may have the same LSN due to
2160  * MULTI_INSERT xlog records.
2161  */
2162  Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
2163 
2164  prev_lsn = change->lsn;
2165 
2166  /*
2167  * Set the current xid to detect concurrent aborts. This is
2168  * required for the cases when we decode the changes before the
2169  * COMMIT record is processed.
2170  */
2171  if (streaming || rbtxn_prepared(change->txn))
2172  {
2173  curtxn = change->txn;
2174  SetupCheckXidLive(curtxn->xid);
2175  }
2176 
2177  switch (change->action)
2178  {
2180 
2181  /*
2182  * Confirmation for speculative insertion arrived. Simply
2183  * use as a normal record. It'll be cleaned up at the end
2184  * of INSERT processing.
2185  */
2186  if (specinsert == NULL)
2187  elog(ERROR, "invalid ordering of speculative insertion changes");
2188  Assert(specinsert->data.tp.oldtuple == NULL);
2189  change = specinsert;
2191 
2192  /* intentionally fall through */
2196  Assert(snapshot_now);
2197 
2198  reloid = RelidByRelfilenumber(change->data.tp.rlocator.spcOid,
2199  change->data.tp.rlocator.relNumber);
2200 
2201  /*
2202  * Mapped catalog tuple without data, emitted while
2203  * catalog table was in the process of being rewritten. We
2204  * can fail to look up the relfilenumber, because the
2205  * relmapper has no "historic" view, in contrast to the
2206  * normal catalog during decoding. Thus repeated rewrites
2207  * can cause a lookup failure. That's OK because we do not
2208  * decode catalog changes anyway. Normally such tuples
2209  * would be skipped over below, but we can't identify
2210  * whether the table should be logically logged without
2211  * mapping the relfilenumber to the oid.
2212  */
2213  if (reloid == InvalidOid &&
2214  change->data.tp.newtuple == NULL &&
2215  change->data.tp.oldtuple == NULL)
2216  goto change_done;
2217  else if (reloid == InvalidOid)
2218  elog(ERROR, "could not map filenumber \"%s\" to relation OID",
2219  relpathperm(change->data.tp.rlocator,
2220  MAIN_FORKNUM));
2221 
2222  relation = RelationIdGetRelation(reloid);
2223 
2224  if (!RelationIsValid(relation))
2225  elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
2226  reloid,
2227  relpathperm(change->data.tp.rlocator,
2228  MAIN_FORKNUM));
2229 
2230  if (!RelationIsLogicallyLogged(relation))
2231  goto change_done;
2232 
2233  /*
2234  * Ignore temporary heaps created during DDL unless the
2235  * plugin has asked for them.
2236  */
2237  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2238  goto change_done;
2239 
2240  /*
2241  * For now ignore sequence changes entirely. Most of the
2242  * time they don't log changes using records we
2243  * understand, so it doesn't make sense to handle the few
2244  * cases we do.
2245  */
2246  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2247  goto change_done;
2248 
2249  /* user-triggered change */
2250  if (!IsToastRelation(relation))
2251  {
2252  ReorderBufferToastReplace(rb, txn, relation, change);
2253  ReorderBufferApplyChange(rb, txn, relation, change,
2254  streaming);
2255 
2256  /*
2257  * Only clear reassembled toast chunks if we're sure
2258  * they're not required anymore. The creator of the
2259  * tuple tells us.
2260  */
2261  if (change->data.tp.clear_toast_afterwards)
2262  ReorderBufferToastReset(rb, txn);
2263  }
2264  /* we're not interested in toast deletions */
2265  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2266  {
2267  /*
2268  * Need to reassemble the full toasted Datum in
2269  * memory, to ensure the chunks don't get reused till
2270  * we're done remove it from the list of this
2271  * transaction's changes. Otherwise it will get
2272  * freed/reused while restoring spooled data from
2273  * disk.
2274  */
2275  Assert(change->data.tp.newtuple != NULL);
2276 
2277  dlist_delete(&change->node);
2278  ReorderBufferToastAppendChunk(rb, txn, relation,
2279  change);
2280  }
2281 
2282  change_done:
2283 
2284  /*
2285  * If speculative insertion was confirmed, the record
2286  * isn't needed anymore.
2287  */
2288  if (specinsert != NULL)
2289  {
2290  ReorderBufferReturnChange(rb, specinsert, true);
2291  specinsert = NULL;
2292  }
2293 
2294  if (RelationIsValid(relation))
2295  {
2296  RelationClose(relation);
2297  relation = NULL;
2298  }
2299  break;
2300 
2302 
2303  /*
2304  * Speculative insertions are dealt with by delaying the
2305  * processing of the insert until the confirmation record
2306  * arrives. For that we simply unlink the record from the
2307  * chain, so it does not get freed/reused while restoring
2308  * spooled data from disk.
2309  *
2310  * This is safe in the face of concurrent catalog changes
2311  * because the relevant relation can't be changed between
2312  * speculative insertion and confirmation due to
2313  * CheckTableNotInUse() and locking.
2314  */
2315 
2316  /* clear out a pending (and thus failed) speculation */
2317  if (specinsert != NULL)
2318  {
2319  ReorderBufferReturnChange(rb, specinsert, true);
2320  specinsert = NULL;
2321  }
2322 
2323  /* and memorize the pending insertion */
2324  dlist_delete(&change->node);
2325  specinsert = change;
2326  break;
2327 
2329 
2330  /*
2331  * Abort for speculative insertion arrived. So cleanup the
2332  * specinsert tuple and toast hash.
2333  *
2334  * Note that we get the spec abort change for each toast
2335  * entry but we need to perform the cleanup only the first
2336  * time we get it for the main table.
2337  */
2338  if (specinsert != NULL)
2339  {
2340  /*
2341  * We must clean the toast hash before processing a
2342  * completely new tuple to avoid confusion about the
2343  * previous tuple's toast chunks.
2344  */
2345  Assert(change->data.tp.clear_toast_afterwards);
2346  ReorderBufferToastReset(rb, txn);
2347 
2348  /* We don't need this record anymore. */
2349  ReorderBufferReturnChange(rb, specinsert, true);
2350  specinsert = NULL;
2351  }
2352  break;
2353 
2355  {
2356  int i;
2357  int nrelids = change->data.truncate.nrelids;
2358  int nrelations = 0;
2359  Relation *relations;
2360 
2361  relations = palloc0(nrelids * sizeof(Relation));
2362  for (i = 0; i < nrelids; i++)
2363  {
2364  Oid relid = change->data.truncate.relids[i];
2365  Relation rel;
2366 
2367  rel = RelationIdGetRelation(relid);
2368 
2369  if (!RelationIsValid(rel))
2370  elog(ERROR, "could not open relation with OID %u", relid);
2371 
2372  if (!RelationIsLogicallyLogged(rel))
2373  continue;
2374 
2375  relations[nrelations++] = rel;
2376  }
2377 
2378  /* Apply the truncate. */
2379  ReorderBufferApplyTruncate(rb, txn, nrelations,
2380  relations, change,
2381  streaming);
2382 
2383  for (i = 0; i < nrelations; i++)
2384  RelationClose(relations[i]);
2385 
2386  break;
2387  }
2388 
2390  ReorderBufferApplyMessage(rb, txn, change, streaming);
2391  break;
2392 
2394  /* Execute the invalidation messages locally */
2395  ReorderBufferExecuteInvalidations(change->data.inval.ninvalidations,
2396  change->data.inval.invalidations);
2397  break;
2398 
2400  /* get rid of the old */
2401  TeardownHistoricSnapshot(false);
2402 
2403  if (snapshot_now->copied)
2404  {
2405  ReorderBufferFreeSnap(rb, snapshot_now);
2406  snapshot_now =
2407  ReorderBufferCopySnap(rb, change->data.snapshot,
2408  txn, command_id);
2409  }
2410 
2411  /*
2412  * Restored from disk, need to be careful not to double
2413  * free. We could introduce refcounting for that, but for
2414  * now this seems infrequent enough not to care.
2415  */
2416  else if (change->data.snapshot->copied)
2417  {
2418  snapshot_now =
2419  ReorderBufferCopySnap(rb, change->data.snapshot,
2420  txn, command_id);
2421  }
2422  else
2423  {
2424  snapshot_now = change->data.snapshot;
2425  }
2426 
2427  /* and continue with the new one */
2428  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2429  break;
2430 
2432  Assert(change->data.command_id != InvalidCommandId);
2433 
2434  if (command_id < change->data.command_id)
2435  {
2436  command_id = change->data.command_id;
2437 
2438  if (!snapshot_now->copied)
2439  {
2440  /* we don't use the global one anymore */
2441  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2442  txn, command_id);
2443  }
2444 
2445  snapshot_now->curcid = command_id;
2446 
2447  TeardownHistoricSnapshot(false);
2448  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2449  }
2450 
2451  break;
2452 
2454  elog(ERROR, "tuplecid value in changequeue");
2455  break;
2456  }
2457 
2458  /*
2459  * It is possible that the data is not sent to downstream for a
2460  * long time either because the output plugin filtered it or there
2461  * is a DDL that generates a lot of data that is not processed by
2462  * the plugin. So, in such cases, the downstream can timeout. To
2463  * avoid that we try to send a keepalive message if required.
2464  * Trying to send a keepalive message after every change has some
2465  * overhead, but testing showed there is no noticeable overhead if
2466  * we do it after every ~100 changes.
2467  */
2468 #define CHANGES_THRESHOLD 100
2469 
2470  if (++changes_count >= CHANGES_THRESHOLD)
2471  {
2472  rb->update_progress_txn(rb, txn, change->lsn);
2473  changes_count = 0;
2474  }
2475  }
2476 
2477  /* speculative insertion record must be freed by now */
2478  Assert(!specinsert);
2479 
2480  /* clean up the iterator */
2481  ReorderBufferIterTXNFinish(rb, iterstate);
2482  iterstate = NULL;
2483 
2484  /*
2485  * Update total transaction count and total bytes processed by the
2486  * transaction and its subtransactions. Ensure to not count the
2487  * streamed transaction multiple times.
2488  *
2489  * Note that the statistics computation has to be done after
2490  * ReorderBufferIterTXNFinish as it releases the serialized change
2491  * which we have already accounted in ReorderBufferIterTXNNext.
2492  */
2493  if (!rbtxn_is_streamed(txn))
2494  rb->totalTxns++;
2495 
2496  rb->totalBytes += txn->total_size;
2497 
2498  /*
2499  * Done with current changes, send the last message for this set of
2500  * changes depending upon streaming mode.
2501  */
2502  if (streaming)
2503  {
2504  if (stream_started)
2505  {
2506  rb->stream_stop(rb, txn, prev_lsn);
2507  stream_started = false;
2508  }
2509  }
2510  else
2511  {
2512  /*
2513  * Call either PREPARE (for two-phase transactions) or COMMIT (for
2514  * regular ones).
2515  */
2516  if (rbtxn_prepared(txn))
2517  rb->prepare(rb, txn, commit_lsn);
2518  else
2519  rb->commit(rb, txn, commit_lsn);
2520  }
2521 
2522  /* this is just a sanity check against bad output plugin behaviour */
2524  elog(ERROR, "output plugin used XID %u",
2526 
2527  /*
2528  * Remember the command ID and snapshot for the next set of changes in
2529  * streaming mode.
2530  */
2531  if (streaming)
2532  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2533  else if (snapshot_now->copied)
2534  ReorderBufferFreeSnap(rb, snapshot_now);
2535 
2536  /* cleanup */
2537  TeardownHistoricSnapshot(false);
2538 
2539  /*
2540  * Aborting the current (sub-)transaction as a whole has the right
2541  * semantics. We want all locks acquired in here to be released, not
2542  * reassigned to the parent and we do not want any database access
2543  * have persistent effects.
2544  */
2546 
2547  /* make sure there's no cache pollution */
2549 
2550  if (using_subtxn)
2552 
2553  /*
2554  * We are here due to one of the four reasons: 1. Decoding an
2555  * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
2556  * prepared txn that was (partially) streamed. 4. Decoding a committed
2557  * txn.
2558  *
2559  * For 1, we allow truncation of txn data by removing the changes
2560  * already streamed but still keeping other things like invalidations,
2561  * snapshot, and tuplecids. For 2 and 3, we indicate
2562  * ReorderBufferTruncateTXN to do more elaborate truncation of txn
2563  * data as the entire transaction has been decoded except for commit.
2564  * For 4, as the entire txn has been decoded, we can fully clean up
2565  * the TXN reorder buffer.
2566  */
2567  if (streaming || rbtxn_prepared(txn))
2568  {
2570  /* Reset the CheckXidAlive */
2572  }
2573  else
2574  ReorderBufferCleanupTXN(rb, txn);
2575  }
2576  PG_CATCH();
2577  {
2578  MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
2579  ErrorData *errdata = CopyErrorData();
2580 
2581  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
2582  if (iterstate)
2583  ReorderBufferIterTXNFinish(rb, iterstate);
2584 
2586 
2587  /*
2588  * Force cache invalidation to happen outside of a valid transaction
2589  * to prevent catalog access as we just caught an error.
2590  */
2592 
2593  /* make sure there's no cache pollution */
2595  txn->invalidations);
2596 
2597  if (using_subtxn)
2599 
2600  /*
2601  * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2602  * abort of the (sub)transaction we are streaming or preparing. We
2603  * need to do the cleanup and return gracefully on this error, see
2604  * SetupCheckXidLive.
2605  *
2606  * This error code can be thrown by one of the callbacks we call
2607  * during decoding so we need to ensure that we return gracefully only
2608  * when we are sending the data in streaming mode and the streaming is
2609  * not finished yet or when we are sending the data out on a PREPARE
2610  * during a two-phase commit.
2611  */
2612  if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2613  (stream_started || rbtxn_prepared(txn)))
2614  {
2615  /* curtxn must be set for streaming or prepared transactions */
2616  Assert(curtxn);
2617 
2618  /* Cleanup the temporary error state. */
2619  FlushErrorState();
2620  FreeErrorData(errdata);
2621  errdata = NULL;
2622  curtxn->concurrent_abort = true;
2623 
2624  /* Reset the TXN so that it is allowed to stream remaining data. */
2625  ReorderBufferResetTXN(rb, txn, snapshot_now,
2626  command_id, prev_lsn,
2627  specinsert);
2628  }
2629  else
2630  {
2631  ReorderBufferCleanupTXN(rb, txn);
2632  MemoryContextSwitchTo(ecxt);
2633  PG_RE_THROW();
2634  }
2635  }
2636  PG_END_TRY();
2637 }
bool IsToastRelation(Relation relation)
Definition: catalog.c:145
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1787
void FlushErrorState(void)
Definition: elog.c:1836
ErrorData * CopyErrorData(void)
Definition: elog.c:1731
#define PG_RE_THROW()
Definition: elog.h:411
#define PG_TRY(...)
Definition: elog.h:370
#define PG_END_TRY(...)
Definition: elog.h:395
#define PG_CATCH(...)
Definition: elog.h:380
void * palloc0(Size size)
Definition: mcxt.c:1346
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
const void * data
#define InvalidOid
Definition: postgres_ext.h:36
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:701
#define RelationIsValid(relation)
Definition: rel.h:478
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2062
void RelationClose(Relation relation)
Definition: relcache.c:2193
Oid RelidByRelfilenumber(Oid reltablespace, RelFileNumber relfilenumber)
@ MAIN_FORKNUM
Definition: relpath.h:50
#define relpathperm(rlocator, forknum)
Definition: relpath.h:90
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
static void ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
#define CHANGES_THRESHOLD
static void ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
static void SetupCheckXidLive(TransactionId xid)
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
static void ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
#define rbtxn_prepared(txn)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:1665
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:1649
int sqlerrcode
Definition: elog.h:438
Form_pg_class rd_rel
Definition: rel.h:111
RepOriginId origin_id
Definition: reorderbuffer.h:81
ReorderBufferBeginCB begin_prepare
ReorderBufferUpdateProgressTxnCB update_progress_txn
ReorderBufferStreamStopCB stream_stop
ReorderBufferCommitCB commit
ReorderBufferStreamStartCB stream_start
ReorderBufferBeginCB begin
TransactionId CheckXidAlive
Definition: xact.c:97
void StartTransactionCommand(void)
Definition: xact.c:2995
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:468
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:451

References AbortCurrentTransaction(), ReorderBufferChange::action, Assert, ReorderBuffer::begin, ReorderBuffer::begin_prepare, BeginInternalSubTransaction(), CHANGES_THRESHOLD, CHECK_FOR_INTERRUPTS, CheckXidAlive, ReorderBufferChange::command_id, ReorderBuffer::commit, ReorderBufferTXN::concurrent_abort, SnapshotData::copied, CopyErrorData(), SnapshotData::curcid, CurrentMemoryContext, ReorderBufferChange::data, data, dlist_delete(), elog, ERROR, FlushErrorState(), FreeErrorData(), GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), i, ReorderBufferChange::inval, ReorderBufferTXN::invalidations, InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, ReorderBufferChange::node, ReorderBufferChange::origin_id, ReorderBufferTXN::origin_id, ReorderBuffer::output_rewrites, palloc0(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, ReorderBuffer::prepare, rbtxn_is_streamed, rbtxn_prepared, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelationIsValid, RelidByRelfilenumber(), relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferApplyChange(), ReorderBufferApplyMessage(), ReorderBufferApplyTruncate(), ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferResetTXN(), ReorderBufferReturnChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), RollbackAndReleaseCurrentSubTransaction(), SetupCheckXidLive(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, ErrorData::sqlerrcode, StartTransactionCommand(), ReorderBuffer::stream_start, ReorderBuffer::stream_stop, TeardownHistoricSnapshot(), ReorderBufferTXN::total_size, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTXN::tuplecid_hash, ReorderBufferChange::txn, ReorderBuffer::update_progress_txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferReplay(), and ReorderBufferStreamTXN().

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3102 of file reorderbuffer.c.

3103 {
3104  /* many records won't have an xid assigned, centralize check here */
3105  if (xid != InvalidTransactionId)
3106  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3107 }

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by heap2_decode(), heap_decode(), LogicalDecodingProcessRecord(), logicalmsg_decode(), standby_decode(), xact_decode(), and xlog_decode().

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change,
bool  toast_insert 
)

Definition at line 780 of file reorderbuffer.c.

782 {
783  ReorderBufferTXN *txn;
784 
785  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
786 
787  /*
788  * While streaming the previous changes we have detected that the
789  * transaction is aborted. So there is no point in collecting further
790  * changes for it.
791  */
792  if (txn->concurrent_abort)
793  {
794  /*
795  * We don't need to update memory accounting for this change as we
796  * have not added it to the queue yet.
797  */
798  ReorderBufferReturnChange(rb, change, false);
799  return;
800  }
801 
802  /*
803  * The changes that are sent downstream are considered streamable. We
804  * remember such transactions so that only those will later be considered
805  * for streaming.
806  */
807  if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
813  {
814  ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
815 
817  }
818 
819  change->lsn = lsn;
820  change->txn = txn;
821 
822  Assert(InvalidXLogRecPtr != lsn);
823  dlist_push_tail(&txn->changes, &change->node);
824  txn->nentries++;
825  txn->nentries_mem++;
826 
827  /* update memory accounting information */
828  ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
829  ReorderBufferChangeSize(change));
830 
831  /* process partial change */
832  ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
833 
834  /* check the memory limits and evict something if needed */
836 }
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
#define RBTXN_HAS_STREAMABLE_CHANGE

References ReorderBufferChange::action, Assert, ReorderBufferTXN::changes, ReorderBufferTXN::concurrent_abort, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, rbtxn_get_toptxn, RBTXN_HAS_STREAMABLE_CHANGE, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferCheckMemoryLimit(), ReorderBufferProcessPartialChange(), ReorderBufferReturnChange(), ReorderBufferTXNByXid(), ReorderBufferChange::txn, and ReorderBufferTXN::txn_flags.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddInvalidations(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snap,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 843 of file reorderbuffer.c.

847 {
848  if (transactional)
849  {
850  MemoryContext oldcontext;
851  ReorderBufferChange *change;
852 
854 
855  /*
856  * We don't expect snapshots for transactional changes - we'll use the
857  * snapshot derived later during apply (unless the change gets
858  * skipped).
859  */
860  Assert(!snap);
861 
862  oldcontext = MemoryContextSwitchTo(rb->context);
863 
864  change = ReorderBufferGetChange(rb);
866  change->data.msg.prefix = pstrdup(prefix);
867  change->data.msg.message_size = message_size;
868  change->data.msg.message = palloc(message_size);
869  memcpy(change->data.msg.message, message, message_size);
870 
871  ReorderBufferQueueChange(rb, xid, lsn, change, false);
872 
873  MemoryContextSwitchTo(oldcontext);
874  }
875  else
876  {
877  ReorderBufferTXN *txn = NULL;
878  volatile Snapshot snapshot_now = snap;
879 
880  /* Non-transactional changes require a valid snapshot. */
881  Assert(snapshot_now);
882 
883  if (xid != InvalidTransactionId)
884  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
885 
886  /* setup snapshot to allow catalog access */
887  SetupHistoricSnapshot(snapshot_now, NULL);
888  PG_TRY();
889  {
890  rb->message(rb, txn, lsn, false, prefix, message_size, message);
891 
893  }
894  PG_CATCH();
895  {
897  PG_RE_THROW();
898  }
899  PG_END_TRY();
900  }
901 }

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), and TeardownHistoricSnapshot().

Referenced by logicalmsg_decode().

◆ ReorderBufferRememberPrepareInfo()

bool ReorderBufferRememberPrepareInfo ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  prepare_lsn,
XLogRecPtr  end_lsn,
TimestampTz  prepare_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2733 of file reorderbuffer.c.

2737 {
2738  ReorderBufferTXN *txn;
2739 
2740  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2741 
2742  /* unknown transaction, nothing to do */
2743  if (txn == NULL)
2744  return false;
2745 
2746  /*
2747  * Remember the prepare information to be later used by commit prepared in
2748  * case we skip doing prepare.
2749  */
2750  txn->final_lsn = prepare_lsn;
2751  txn->end_lsn = end_lsn;
2752  txn->xact_time.prepare_time = prepare_time;
2753  txn->origin_id = origin_id;
2754  txn->origin_lsn = origin_lsn;
2755 
2756  return true;
2757 }

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, ReorderBufferTXNByXid(), and ReorderBufferTXN::xact_time.

Referenced by DecodePrepare().

◆ ReorderBufferReplay()

static void ReorderBufferReplay ( ReorderBufferTXN txn,
ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)
static

Definition at line 2650 of file reorderbuffer.c.

2655 {
2656  Snapshot snapshot_now;
2657  CommandId command_id = FirstCommandId;
2658 
2659  txn->final_lsn = commit_lsn;
2660  txn->end_lsn = end_lsn;
2661  txn->xact_time.commit_time = commit_time;
2662  txn->origin_id = origin_id;
2663  txn->origin_lsn = origin_lsn;
2664 
2665  /*
2666  * If the transaction was (partially) streamed, we need to commit it in a
2667  * 'streamed' way. That is, we first stream the remaining part of the
2668  * transaction, and then invoke stream_commit message.
2669  *
2670  * Called after everything (origin ID, LSN, ...) is stored in the
2671  * transaction to avoid passing that information directly.
2672  */
2673  if (rbtxn_is_streamed(txn))
2674  {
2675  ReorderBufferStreamCommit(rb, txn);
2676  return;
2677  }
2678 
2679  /*
2680  * If this transaction has no snapshot, it didn't make any changes to the
2681  * database, so there's nothing to decode. Note that
2682  * ReorderBufferCommitChild will have transferred any snapshots from
2683  * subtransactions if there were any.
2684  */
2685  if (txn->base_snapshot == NULL)
2686  {
2687  Assert(txn->ninvalidations == 0);
2688 
2689  /*
2690  * Removing this txn before a commit might result in the computation
2691  * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
2692  */
2693  if (!rbtxn_prepared(txn))
2694  ReorderBufferCleanupTXN(rb, txn);
2695  return;
2696  }
2697 
2698  snapshot_now = txn->base_snapshot;
2699 
2700  /* Process and send the changes to output plugin. */
2701  ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2702  command_id, false);
2703 }
#define FirstCommandId
Definition: c.h:668
uint32 CommandId
Definition: c.h:666
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, FirstCommandId, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, rbtxn_is_streamed, rbtxn_prepared, ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), ReorderBufferStreamCommit(), and ReorderBufferTXN::xact_time.

Referenced by ReorderBufferCommit(), ReorderBufferFinishPrepared(), and ReorderBufferPrepare().

◆ ReorderBufferResetTXN()

static void ReorderBufferResetTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id,
XLogRecPtr  last_lsn,
ReorderBufferChange specinsert 
)
static

Definition at line 2037 of file reorderbuffer.c.

2042 {
2043  /* Discard the changes that we just streamed */
2045 
2046  /* Free all resources allocated for toast reconstruction */
2047  ReorderBufferToastReset(rb, txn);
2048 
2049  /* Return the spec insert change if it is not NULL */
2050  if (specinsert != NULL)
2051  {
2052  ReorderBufferReturnChange(rb, specinsert, true);
2053  specinsert = NULL;
2054  }
2055 
2056  /*
2057  * For the streaming case, stop the stream and remember the command ID and
2058  * snapshot for the streaming run.
2059  */
2060  if (rbtxn_is_streamed(txn))
2061  {
2062  rb->stream_stop(rb, txn, last_lsn);
2063  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2064  }
2065 }

References rbtxn_is_streamed, rbtxn_prepared, ReorderBufferReturnChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), and ReorderBuffer::stream_stop.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferRestoreChange()

static void ReorderBufferRestoreChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
char *  data 
)
static

Definition at line 4371 of file reorderbuffer.c.

4373 {
4374  ReorderBufferDiskChange *ondisk;
4375  ReorderBufferChange *change;
4376 
4377  ondisk = (ReorderBufferDiskChange *) data;
4378 
4379  change = ReorderBufferGetChange(rb);
4380 
4381  /* copy static part */
4382  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
4383 
4384  data += sizeof(ReorderBufferDiskChange);
4385 
4386  /* restore individual stuff */
4387  switch (change->action)
4388  {
4389  /* fall through these, they're all similar enough */
4394  if (change->data.tp.oldtuple)
4395  {
4396  uint32 tuplelen = ((HeapTuple) data)->t_len;
4397 
4398  change->data.tp.oldtuple =
4400 
4401  /* restore ->tuple */
4402  memcpy(change->data.tp.oldtuple, data,
4403  sizeof(HeapTupleData));
4404  data += sizeof(HeapTupleData);
4405 
4406  /* reset t_data pointer into the new tuplebuf */
4407  change->data.tp.oldtuple->t_data =
4408  (HeapTupleHeader) ((char *) change->data.tp.oldtuple + HEAPTUPLESIZE);
4409 
4410  /* restore tuple data itself */
4411  memcpy(change->data.tp.oldtuple->t_data, data, tuplelen);
4412  data += tuplelen;
4413  }
4414 
4415  if (change->data.tp.newtuple)
4416  {
4417  /* here, data might not be suitably aligned! */
4418  uint32 tuplelen;
4419 
4420  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
4421  sizeof(uint32));
4422 
4423  change->data.tp.newtuple =
4425 
4426  /* restore ->tuple */
4427  memcpy(change->data.tp.newtuple, data,
4428  sizeof(HeapTupleData));
4429  data += sizeof(HeapTupleData);
4430 
4431  /* reset t_data pointer into the new tuplebuf */
4432  change->data.tp.newtuple->t_data =
4433  (HeapTupleHeader) ((char *) change->data.tp.newtuple + HEAPTUPLESIZE);
4434 
4435  /* restore tuple data itself */
4436  memcpy(change->data.tp.newtuple->t_data, data, tuplelen);
4437  data += tuplelen;
4438  }
4439 
4440  break;
4442  {
4443  Size prefix_size;
4444 
4445  /* read prefix */
4446  memcpy(&prefix_size, data, sizeof(Size));
4447  data += sizeof(Size);
4448  change->data.msg.prefix = MemoryContextAlloc(rb->context,
4449  prefix_size);
4450  memcpy(change->data.msg.prefix, data, prefix_size);
4451  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
4452  data += prefix_size;
4453 
4454  /* read the message */
4455  memcpy(&change->data.msg.message_size, data, sizeof(Size));
4456  data += sizeof(Size);
4457  change->data.msg.message = MemoryContextAlloc(rb->context,
4458  change->data.msg.message_size);
4459  memcpy(change->data.msg.message, data,
4460  change->data.msg.message_size);
4461  data += change->data.msg.message_size;
4462 
4463  break;
4464  }
4466  {
4467  Size inval_size = sizeof(SharedInvalidationMessage) *
4468  change->data.inval.ninvalidations;
4469 
4470  change->data.inval.invalidations =
4471  MemoryContextAlloc(rb->context, inval_size);
4472 
4473  /* read the message */
4474  memcpy(change->data.inval.invalidations, data, inval_size);
4475 
4476  break;
4477  }
4479  {
4480  Snapshot oldsnap;
4481  Snapshot newsnap;
4482  Size size;
4483 
4484  oldsnap = (Snapshot) data;
4485 
4486  size = sizeof(SnapshotData) +
4487  sizeof(TransactionId) * oldsnap->xcnt +
4488  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
4489 
4491 
4492  newsnap = change->data.snapshot;
4493 
4494  memcpy(newsnap, data, size);
4495  newsnap->xip = (TransactionId *)
4496  (((char *) newsnap) + sizeof(SnapshotData));
4497  newsnap->subxip = newsnap->xip + newsnap->xcnt;
4498  newsnap->copied = true;
4499  break;
4500  }
4501  /* the base struct contains all the data, easy peasy */
4503  {
4504  Oid *relids;
4505 
4506  relids = ReorderBufferGetRelids(rb,
4507  change->data.truncate.nrelids);
4508  memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
4509  change->data.truncate.relids = relids;
4510 
4511  break;
4512  }
4517  break;
4518  }
4519 
4520  dlist_push_tail(&txn->changes, &change->node);
4521  txn->nentries_mem++;
4522 
4523  /*
4524  * Update memory accounting for the restored change. We need to do this
4525  * although we don't check the memory limit when restoring the changes in
4526  * this branch (we only do that when initially queueing the changes after
4527  * decoding), because we will release the changes later, and that will
4528  * update the accounting too (subtracting the size from the counters). And
4529  * we don't want to underflow there.
4530  */
4531  ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
4532  ReorderBufferChangeSize(change));
4533 }
struct ReorderBufferDiskChange ReorderBufferDiskChange
HeapTuple ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
Oid * ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
struct SnapshotData * Snapshot
Definition: snapshot.h:121
ReorderBufferChange change

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, ReorderBufferTXN::changes, ReorderBuffer::context, SnapshotData::copied, ReorderBufferChange::data, data, dlist_push_tail(), HEAPTUPLESIZE, ReorderBufferChange::inval, MemoryContextAlloc(), MemoryContextAllocZero(), ReorderBufferChange::msg, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferGetChange(), ReorderBufferGetRelids(), ReorderBufferGetTupleBuf(), size, SizeofHeapTupleHeader, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, ReorderBufferChange::tp, ReorderBufferChange::truncate, SnapshotData::xcnt, and SnapshotData::xip.

Referenced by ReorderBufferRestoreChanges().

◆ ReorderBufferRestoreChanges()

static Size ReorderBufferRestoreChanges ( ReorderBuffer rb,
ReorderBufferTXN txn,
TXNEntryFile file,
XLogSegNo segno 
)
static

Definition at line 4228 of file reorderbuffer.c.

4230 {
4231  Size restored = 0;
4232  XLogSegNo last_segno;
4233  dlist_mutable_iter cleanup_iter;
4234  File *fd = &file->vfd;
4235 
4238 
4239  /* free current entries, so we have memory for more */
4240  dlist_foreach_modify(cleanup_iter, &txn->changes)
4241  {
4243  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
4244 
4245  dlist_delete(&cleanup->node);
4247  }
4248  txn->nentries_mem = 0;
4249  Assert(dlist_is_empty(&txn->changes));
4250 
4251  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
4252 
4253  while (restored < max_changes_in_memory && *segno <= last_segno)
4254  {
4255  int readBytes;
4256  ReorderBufferDiskChange *ondisk;
4257 
4259 
4260  if (*fd == -1)
4261  {
4262  char path[MAXPGPATH];
4263 
4264  /* first time in */
4265  if (*segno == 0)
4266  XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
4267 
4268  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
4269 
4270  /*
4271  * No need to care about TLIs here, only used during a single run,
4272  * so each LSN only maps to a specific WAL record.
4273  */
4275  *segno);
4276 
4277  *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
4278 
4279  /* No harm in resetting the offset even in case of failure */
4280  file->curOffset = 0;
4281 
4282  if (*fd < 0 && errno == ENOENT)
4283  {
4284  *fd = -1;
4285  (*segno)++;
4286  continue;
4287  }
4288  else if (*fd < 0)
4289  ereport(ERROR,
4291  errmsg("could not open file \"%s\": %m",
4292  path)));
4293  }
4294 
4295  /*
4296  * Read the statically sized part of a change which has information
4297  * about the total size. If we couldn't read a record, we're at the
4298  * end of this file.
4299  */
4301  readBytes = FileRead(file->vfd, rb->outbuf,
4302  sizeof(ReorderBufferDiskChange),
4303  file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
4304 
4305  /* eof */
4306  if (readBytes == 0)
4307  {
4308  FileClose(*fd);
4309  *fd = -1;
4310  (*segno)++;
4311  continue;
4312  }
4313  else if (readBytes < 0)
4314  ereport(ERROR,
4316  errmsg("could not read from reorderbuffer spill file: %m")));
4317  else if (readBytes != sizeof(ReorderBufferDiskChange))
4318  ereport(ERROR,
4320  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4321  readBytes,
4322  (uint32) sizeof(ReorderBufferDiskChange))));
4323 
4324  file->curOffset += readBytes;
4325 
4326  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4327 
4329  sizeof(ReorderBufferDiskChange) + ondisk->size);
4330  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4331 
4332  readBytes = FileRead(file->vfd,
4333  rb->outbuf + sizeof(ReorderBufferDiskChange),
4334  ondisk->size - sizeof(ReorderBufferDiskChange),
4335  file->curOffset,
4336  WAIT_EVENT_REORDER_BUFFER_READ);
4337 
4338  if (readBytes < 0)
4339  ereport(ERROR,
4341  errmsg("could not read from reorderbuffer spill file: %m")));
4342  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
4343  ereport(ERROR,
4345  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4346  readBytes,
4347  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
4348 
4349  file->curOffset += readBytes;
4350 
4351  /*
4352  * ok, read a full change from disk, now restore it into proper
4353  * in-memory format
4354  */
4355  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
4356  restored++;
4357  }
4358 
4359  return restored;
4360 }
static void cleanup(void)
Definition: bootstrap.c:682
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1575
static ssize_t FileRead(File file, void *buffer, size_t amount, off_t offset, uint32 wait_event_info)
Definition: fd.h:196
int File
Definition: fd.h:51
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *data)
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
static const Size max_changes_in_memory
int wal_segment_size
Definition: xlog.c:143
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition: xlogdefs.h:48

References Assert, ReorderBufferTXN::changes, CHECK_FOR_INTERRUPTS, cleanup(), dlist_mutable_iter::cur, TXNEntryFile::curOffset, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FileClose(), FileRead(), ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, max_changes_in_memory, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBuffer::outbuf, PathNameOpenFile(), PG_BINARY, ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferSerializedPath(), ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, TXNEntryFile::vfd, wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferIterTXNInit(), and ReorderBufferIterTXNNext().

◆ ReorderBufferRestoreCleanup()

static void ReorderBufferRestoreCleanup ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 4539 of file reorderbuffer.c.

4540 {
4541  XLogSegNo first;
4542  XLogSegNo cur;
4543  XLogSegNo last;
4544 
4547 
4548  XLByteToSeg(txn->first_lsn, first, wal_segment_size);
4549  XLByteToSeg(txn->final_lsn, last, wal_segment_size);
4550 
4551  /* iterate over all possible filenames, and delete them */
4552  for (cur = first; cur <= last; cur++)
4553  {
4554  char path[MAXPGPATH];
4555 
4557  if (unlink(path) != 0 && errno != ENOENT)
4558  ereport(ERROR,
4560  errmsg("could not remove file \"%s\": %m", path)));
4561  }
4562 }
struct cursor * cur
Definition: ecpg.c:28

References Assert, cur, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, MAXPGPATH, MyReplicationSlot, ReorderBufferSerializedPath(), wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferCleanupTXN(), and ReorderBufferTruncateTXN().

◆ ReorderBufferReturnChange()

void ReorderBufferReturnChange ( ReorderBuffer rb,
ReorderBufferChange change,
bool  upd_mem 
)

Definition at line 492 of file reorderbuffer.c.

494 {
495  /* update memory accounting info */
496  if (upd_mem)
497  ReorderBufferChangeMemoryUpdate(rb, change, NULL, false,
498  ReorderBufferChangeSize(change));
499 
500  /* free contained data */
501  switch (change->action)
502  {
507  if (change->data.tp.newtuple)
508  {
509  ReorderBufferReturnTupleBuf(change->data.tp.newtuple);
510  change->data.tp.newtuple = NULL;
511  }
512 
513  if (change->data.tp.oldtuple)
514  {
515  ReorderBufferReturnTupleBuf(change->data.tp.oldtuple);
516  change->data.tp.oldtuple = NULL;
517  }
518  break;
520  if (change->data.msg.prefix != NULL)
521  pfree(change->data.msg.prefix);
522  change->data.msg.prefix = NULL;
523  if (change->data.msg.message != NULL)
524  pfree(change->data.msg.message);
525  change->data.msg.message = NULL;
526  break;
528  if (change->data.inval.invalidations)
529  pfree(change->data.inval.invalidations);
530  change->data.inval.invalidations = NULL;
531  break;
533  if (change->data.snapshot)
534  {
535  ReorderBufferFreeSnap(rb, change->data.snapshot);
536  change->data.snapshot = NULL;
537  }
538  break;
539  /* no data in addition to the struct itself */
541  if (change->data.truncate.relids != NULL)
542  {
543  ReorderBufferReturnRelids(rb, change->data.truncate.relids);
544  change->data.truncate.relids = NULL;
545  }
546  break;
551  break;
552  }
553 
554  pfree(change);
555 }
void ReorderBufferReturnTupleBuf(HeapTuple tuple)
void ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::msg, pfree(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferFreeSnap(), ReorderBufferReturnRelids(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, ReorderBufferChange::tp, and ReorderBufferChange::truncate.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferProcessTXN(), ReorderBufferQueueChange(), ReorderBufferResetTXN(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferToastReset(), and ReorderBufferTruncateTXN().

◆ ReorderBufferReturnRelids()

void ReorderBufferReturnRelids ( ReorderBuffer rb,
Oid relids 
)

Definition at line 611 of file reorderbuffer.c.

612 {
613  pfree(relids);
614 }

References pfree().

Referenced by ReorderBufferReturnChange().

◆ ReorderBufferReturnTupleBuf()

void ReorderBufferReturnTupleBuf ( HeapTuple  tuple)

Definition at line 580 of file reorderbuffer.c.

581 {
582  pfree(tuple);
583 }

References pfree().

Referenced by ReorderBufferReturnChange().

◆ ReorderBufferReturnTXN()

static void ReorderBufferReturnTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 438 of file reorderbuffer.c.

439 {
440  /* clean the lookup cache if we were cached (quite likely) */
441  if (rb->by_txn_last_xid == txn->xid)
442  {
444  rb->by_txn_last_txn = NULL;
445  }
446 
447  /* free data that's contained */
448 
449  if (txn->gid != NULL)
450  {
451  pfree(txn->gid);
452  txn->gid = NULL;
453  }
454 
455  if (txn->tuplecid_hash != NULL)
456  {
458  txn->tuplecid_hash = NULL;
459  }
460 
461  if (txn->invalidations)
462  {
463  pfree(txn->invalidations);
464  txn->invalidations = NULL;
465  }
466 
467  /* Reset the toast hash */
468  ReorderBufferToastReset(rb, txn);
469 
470  pfree(txn);
471 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:865

References ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBufferTXN::gid, hash_destroy(), ReorderBufferTXN::invalidations, InvalidTransactionId, pfree(), ReorderBufferToastReset(), ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::xid.

Referenced by ReorderBufferCleanupTXN().

◆ ReorderBufferSaveTXNSnapshot()

static void ReorderBufferSaveTXNSnapshot ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id 
)
inlinestatic

Definition at line 2016 of file reorderbuffer.c.

2018 {
2019  txn->command_id = command_id;
2020 
2021  /* Avoid copying if it's already copied. */
2022  if (snapshot_now->copied)
2023  txn->snapshot_now = snapshot_now;
2024  else
2025  txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2026  txn, command_id);
2027 }

References ReorderBufferTXN::command_id, SnapshotData::copied, ReorderBufferCopySnap(), and ReorderBufferTXN::snapshot_now.

Referenced by ReorderBufferProcessTXN(), and ReorderBufferResetTXN().

◆ ReorderBufferSerializeChange()

static void ReorderBufferSerializeChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  fd,
ReorderBufferChange change 
)
static

Definition at line 3776 of file reorderbuffer.c.

3778 {
3779  ReorderBufferDiskChange *ondisk;
3780  Size sz = sizeof(ReorderBufferDiskChange);
3781 
3783 
3784  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3785  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
3786 
3787  switch (change->action)
3788  {
3789  /* fall through these, they're all similar enough */
3794  {
3795  char *data;
3796  HeapTuple oldtup,
3797  newtup;
3798  Size oldlen = 0;
3799  Size newlen = 0;
3800 
3801  oldtup = change->data.tp.oldtuple;
3802  newtup = change->data.tp.newtuple;
3803 
3804  if (oldtup)
3805  {
3806  sz += sizeof(HeapTupleData);
3807  oldlen = oldtup->t_len;
3808  sz += oldlen;
3809  }
3810 
3811  if (newtup)
3812  {
3813  sz += sizeof(HeapTupleData);
3814  newlen = newtup->t_len;
3815  sz += newlen;
3816  }
3817 
3818  /* make sure we have enough space */
3820 
3821  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3822  /* might have been reallocated above */
3823  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3824 
3825  if (oldlen)
3826  {
3827  memcpy(data, oldtup, sizeof(HeapTupleData));
3828  data += sizeof(HeapTupleData);
3829 
3830  memcpy(data, oldtup->t_data, oldlen);
3831  data += oldlen;
3832  }
3833 
3834  if (newlen)
3835  {
3836  memcpy(data, newtup, sizeof(HeapTupleData));
3837  data += sizeof(HeapTupleData);
3838 
3839  memcpy(data, newtup->t_data, newlen);
3840  data += newlen;
3841  }
3842  break;
3843  }
3845  {
3846  char *data;
3847  Size prefix_size = strlen(change->data.msg.prefix) + 1;
3848 
3849  sz += prefix_size + change->data.msg.message_size +
3850  sizeof(Size) + sizeof(Size);
3852 
3853  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3854 
3855  /* might have been reallocated above */
3856  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3857 
3858  /* write the prefix including the size */
3859  memcpy(data, &prefix_size, sizeof(Size));
3860  data += sizeof(Size);
3861  memcpy(data, change->data.msg.prefix,
3862  prefix_size);
3863  data += prefix_size;
3864 
3865  /* write the message including the size */
3866  memcpy(data, &change->data.msg.message_size, sizeof(Size));
3867  data += sizeof(Size);
3868  memcpy(data, change->data.msg.message,
3869  change->data.msg.message_size);
3870  data += change->data.msg.message_size;
3871 
3872  break;
3873  }
3875  {
3876  char *data;
3877  Size inval_size = sizeof(SharedInvalidationMessage) *
3878  change->data.inval.ninvalidations;
3879 
3880  sz += inval_size;
3881 
3883  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3884 
3885  /* might have been reallocated above */
3886  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3887  memcpy(data, change->data.inval.invalidations, inval_size);
3888  data += inval_size;
3889 
3890  break;
3891  }
3893  {
3894  Snapshot snap;
3895  char *data;
3896 
3897  snap = change->data.snapshot;
3898 
3899  sz += sizeof(SnapshotData) +
3900  sizeof(TransactionId) * snap->xcnt +
3901  sizeof(TransactionId) * snap->subxcnt;
3902 
3903  /* make sure we have enough space */
3905  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3906  /* might have been reallocated above */
3907  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3908 
3909  memcpy(data, snap, sizeof(SnapshotData));
3910  data += sizeof(SnapshotData);
3911 
3912  if (snap->xcnt)
3913  {
3914  memcpy(data, snap->xip,
3915  sizeof(TransactionId) * snap->xcnt);
3916  data += sizeof(TransactionId) * snap->xcnt;
3917  }
3918 
3919  if (snap->subxcnt)
3920  {
3921  memcpy(data, snap->subxip,
3922  sizeof(TransactionId) * snap->subxcnt);
3923  data += sizeof(TransactionId) * snap->subxcnt;
3924  }
3925  break;
3926  }
3928  {
3929  Size size;
3930  char *data;
3931 
3932  /* account for the OIDs of truncated relations */
3933  size = sizeof(Oid) * change->data.truncate.nrelids;
3934  sz += size;
3935 
3936  /* make sure we have enough space */
3938 
3939  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3940  /* might have been reallocated above */
3941  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3942 
3943  memcpy(data, change->data.truncate.relids, size);
3944  data += size;
3945 
3946  break;
3947  }
3952  /* ReorderBufferChange contains everything important */
3953  break;
3954  }
3955 
3956  ondisk->size = sz;
3957 
3958  errno = 0;
3959  pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
3960  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
3961  {
3962  int save_errno = errno;
3963 
3965 
3966  /* if write didn't set errno, assume problem is no disk space */
3967  errno = save_errno ? save_errno : ENOSPC;
3968  ereport(ERROR,
3970  errmsg("could not write to data file for XID %u: %m",
3971  txn->xid)));
3972  }
3974 
3975  /*
3976  * Keep the transaction's final_lsn up to date with each change we send to
3977  * disk, so that ReorderBufferRestoreCleanup works correctly. (We used to
3978  * only do this on commit and abort records, but that doesn't work if a
3979  * system crash leaves a transaction without its abort record).
3980  *
3981  * Make sure not to move it backwards.
3982  */
3983  if (txn->final_lsn < change->lsn)
3984  txn->final_lsn = change->lsn;
3985 
3986  Assert(ondisk->change.action == change->action);
3987 }
#define write(a, b, c)
Definition: win32.h:14
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, CloseTransientFile(), ReorderBufferChange::data, data, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferTXN::final_lsn, if(), ReorderBufferChange::inval, ReorderBufferChange::lsn, ReorderBufferChange::msg, ReorderBuffer::outbuf, pgstat_report_wait_end(), pgstat_report_wait_start(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, size, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, write, SnapshotData::xcnt, ReorderBufferTXN::xid, and SnapshotData::xip.

Referenced by ReorderBufferSerializeTXN().

◆ ReorderBufferSerializedPath()

static void ReorderBufferSerializedPath ( char *  path,
ReplicationSlot slot,
TransactionId  xid,
XLogSegNo  segno 
)
static

Definition at line 4608 of file reorderbuffer.c.

4610 {
4611  XLogRecPtr recptr;
4612 
4613  XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
4614 
4615  snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.spill",
4617  xid, LSN_FORMAT_ARGS(recptr));
4618 }
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43

References ReplicationSlot::data, LSN_FORMAT_ARGS, MAXPGPATH, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, snprintf, wal_segment_size, and XLogSegNoOffsetToRecPtr.

Referenced by ReorderBufferRestoreChanges(), ReorderBufferRestoreCleanup(), and ReorderBufferSerializeTXN().

◆ ReorderBufferSerializeReserve()

static void ReorderBufferSerializeReserve ( ReorderBuffer rb,
Size  sz 
)
static

Definition at line 3491 of file reorderbuffer.c.

3492 {
3493  if (!rb->outbufsize)
3494  {
3495  rb->outbuf = MemoryContextAlloc(rb->context, sz);
3496  rb->outbufsize = sz;
3497  }
3498  else if (rb->outbufsize < sz)
3499  {
3500  rb->outbuf = repalloc(rb->outbuf, sz);
3501  rb->outbufsize = sz;
3502  }
3503 }

References ReorderBuffer::context, MemoryContextAlloc(), ReorderBuffer::outbuf, ReorderBuffer::outbufsize, and repalloc().

Referenced by ReorderBufferRestoreChanges(), and ReorderBufferSerializeChange().

◆ ReorderBufferSerializeTXN()

static void ReorderBufferSerializeTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 3681 of file reorderbuffer.c.

3682 {
3683  dlist_iter subtxn_i;
3684  dlist_mutable_iter change_i;
3685  int fd = -1;
3686  XLogSegNo curOpenSegNo = 0;
3687  Size spilled = 0;
3688  Size size = txn->size;
3689 
3690  elog(DEBUG2, "spill %u changes in XID %u to disk",
3691  (uint32) txn->nentries_mem, txn->xid);
3692 
3693  /* do the same to all child TXs */
3694  dlist_foreach(subtxn_i, &txn->subtxns)
3695  {
3696  ReorderBufferTXN *subtxn;
3697 
3698  subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
3699  ReorderBufferSerializeTXN(rb, subtxn);
3700  }
3701 
3702  /* serialize changestream */
3703  dlist_foreach_modify(change_i, &txn->changes)
3704  {
3705  ReorderBufferChange *change;
3706 
3707  change = dlist_container(ReorderBufferChange, node, change_i.cur);
3708 
3709  /*
3710  * store in segment in which it belongs by start lsn, don't split over
3711  * multiple segments tho
3712  */
3713  if (fd == -1 ||
3714  !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
3715  {
3716  char path[MAXPGPATH];
3717 
3718  if (fd != -1)
3720 
3721  XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
3722 
3723  /*
3724  * No need to care about TLIs here, only used during a single run,
3725  * so each LSN only maps to a specific WAL record.
3726  */
3728  curOpenSegNo);
3729 
3730  /* open segment, create it if necessary */
3731  fd = OpenTransientFile(path,
3732  O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
3733 
3734  if (fd < 0)
3735  ereport(ERROR,
3737  errmsg("could not open file \"%s\": %m", path)));
3738  }
3739 
3740  ReorderBufferSerializeChange(rb, txn, fd, change);
3741  dlist_delete(&change->node);
3742  ReorderBufferReturnChange(rb, change, false);
3743 
3744  spilled++;
3745  }
3746 
3747  /* Update the memory counter */
3748  ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, size);
3749 
3750  /* update the statistics iff we have spilled anything */
3751  if (spilled)
3752  {
3753  rb->spillCount += 1;
3754  rb->spillBytes += size;
3755 
3756  /* don't consider already serialized transactions */
3757  rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
3758 
3759  /* update the decoding stats */
3761  }
3762 
3763  Assert(spilled == txn->nentries_mem);
3764  Assert(dlist_is_empty(&txn->changes));
3765  txn->nentries_mem = 0;
3767 
3768  if (fd != -1)
3770 }
void UpdateDecodingStats(LogicalDecodingContext *ctx)
Definition: logical.c:1931
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
#define rbtxn_is_serialized_clear(txn)
#define RBTXN_IS_SERIALIZED
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)

References Assert, ReorderBufferTXN::changes, CloseTransientFile(), dlist_iter::cur, dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_delete(), dlist_foreach, dlist_foreach_modify, dlist_is_empty(), elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferChange::lsn, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, OpenTransientFile(), PG_BINARY, ReorderBuffer::private_data, RBTXN_IS_SERIALIZED, rbtxn_is_serialized, rbtxn_is_serialized_clear, ReorderBufferChangeMemoryUpdate(), ReorderBufferReturnChange(), ReorderBufferSerializeChange(), ReorderBufferSerializedPath(), size, ReorderBufferTXN::size, ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBufferTXN::subtxns, ReorderBufferTXN::txn_flags, UpdateDecodingStats(), wal_segment_size, ReorderBufferTXN::xid, XLByteInSeg, and XLByteToSeg.

Referenced by ReorderBufferCheckMemoryLimit(), and ReorderBufferIterTXNInit().

◆ ReorderBufferSetBaseSnapshot()

void ReorderBufferSetBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 3133 of file reorderbuffer.c.

3135 {
3136  ReorderBufferTXN *txn;
3137  bool is_new;
3138 
3139  Assert(snap != NULL);
3140 
3141  /*
3142  * Fetch the transaction to operate on. If we know it's a subtransaction,
3143  * operate on its top-level transaction instead.
3144  */
3145  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
3146  if (rbtxn_is_known_subxact(txn))
3147  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3148  NULL, InvalidXLogRecPtr, false);
3149  Assert(txn->base_snapshot == NULL);
3150 
3151  txn->base_snapshot = snap;
3152  txn->base_snapshot_lsn = lsn;
3154 
3155  AssertTXNLsnOrder(rb);
3156 }

References Assert, AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBufferTXN::base_snapshot_node, dlist_push_tail(), InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), ReorderBufferTXN::toplevel_xid, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by SnapBuildCommitTxn(), and SnapBuildProcessChange().

◆ ReorderBufferSetRestartPoint()

void ReorderBufferSetRestartPoint ( ReorderBuffer rb,
XLogRecPtr  ptr 
)

Definition at line 1057 of file reorderbuffer.c.

1058 {
1059  rb->current_restart_decoding_lsn = ptr;
1060 }

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

◆ ReorderBufferSkipPrepare()

void ReorderBufferSkipPrepare ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 2761 of file reorderbuffer.c.

2762 {
2763  ReorderBufferTXN *txn;
2764 
2765  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2766 
2767  /* unknown transaction, nothing to do */
2768  if (txn == NULL)
2769  return;
2770 
2772 }
#define RBTXN_SKIPPED_PREPARE

References InvalidXLogRecPtr, RBTXN_SKIPPED_PREPARE, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by DecodePrepare().

◆ ReorderBufferStreamCommit()

static void ReorderBufferStreamCommit ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1881 of file reorderbuffer.c.

1882 {
1883  /* we should only call this for previously streamed transactions */
1884  Assert(rbtxn_is_streamed(txn));
1885 
1886  ReorderBufferStreamTXN(rb, txn);
1887 
1888  if (rbtxn_prepared(txn))
1889  {
1890  /*
1891  * Note, we send stream prepare even if a concurrent abort is
1892  * detected. See DecodePrepare for more information.
1893  */
1894  rb->stream_prepare(rb, txn, txn->final_lsn);
1895 
1896  /*
1897  * This is a PREPARED transaction, part of a two-phase commit. The
1898  * full cleanup will happen as part of the COMMIT PREPAREDs, so now
1899  * just truncate txn by removing changes and tuplecids.
1900  */
1901  ReorderBufferTruncateTXN(rb, txn, true);
1902  /* Reset the CheckXidAlive */
1904  }
1905  else
1906  {
1907  rb->stream_commit(rb, txn, txn->final_lsn);
1908  ReorderBufferCleanupTXN(rb, txn);
1909  }
1910 }
ReorderBufferStreamPrepareCB stream_prepare
ReorderBufferStreamCommitCB stream_commit

References Assert, CheckXidAlive, ReorderBufferTXN::final_lsn, InvalidTransactionId, rbtxn_is_streamed, rbtxn_prepared, ReorderBufferCleanupTXN(), ReorderBufferStreamTXN(), ReorderBufferTruncateTXN(), ReorderBuffer::stream_commit, and ReorderBuffer::stream_prepare.

Referenced by ReorderBufferReplay().

◆ ReorderBufferStreamTXN()

static void ReorderBufferStreamTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 4026 of file reorderbuffer.c.

4027 {
4028  Snapshot snapshot_now;
4029  CommandId command_id;
4030  Size stream_bytes;
4031  bool txn_is_streamed;
4032 
4033  /* We can never reach here for a subtransaction. */
4034  Assert(rbtxn_is_toptxn(txn));
4035 
4036  /*
4037  * We can't make any assumptions about base snapshot here, similar to what
4038  * ReorderBufferCommit() does. That relies on base_snapshot getting
4039  * transferred from subxact in ReorderBufferCommitChild(), but that was
4040  * not yet called as the transaction is in-progress.
4041  *
4042  * So just walk the subxacts and use the same logic here. But we only need
4043  * to do that once, when the transaction is streamed for the first time.
4044  * After that we need to reuse the snapshot from the previous run.
4045  *
4046  * Unlike DecodeCommit which adds xids of all the subtransactions in
4047  * snapshot's xip array via SnapBuildCommitTxn, we can't do that here but
4048  * we do add them to subxip array instead via ReorderBufferCopySnap. This
4049  * allows the catalog changes made in subtransactions decoded till now to
4050  * be visible.
4051  */
4052  if (txn->snapshot_now == NULL)
4053  {
4054  dlist_iter subxact_i;
4055 
4056  /* make sure this transaction is streamed for the first time */
4057  Assert(!rbtxn_is_streamed(txn));
4058 
4059  /* at the beginning we should have invalid command ID */
4061 
4062  dlist_foreach(subxact_i, &txn->subtxns)
4063  {
4064  ReorderBufferTXN *subtxn;
4065 
4066  subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
4067  ReorderBufferTransferSnapToParent(txn, subtxn);
4068  }
4069 
4070  /*
4071  * If this transaction has no snapshot, it didn't make any changes to
4072  * the database till now, so there's nothing to decode.
4073  */
4074  if (txn->base_snapshot == NULL)
4075  {
4076  Assert(txn->ninvalidations == 0);
4077  return;
4078  }
4079 
4080  command_id = FirstCommandId;
4081  snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
4082  txn, command_id);
4083  }
4084  else
4085  {
4086  /* the transaction must have been already streamed */
4087  Assert(rbtxn_is_streamed(txn));
4088 
4089  /*
4090  * Nah, we already have snapshot from the previous streaming run. We
4091  * assume new subxacts can't move the LSN backwards, and so can't beat
4092  * the LSN condition in the previous branch (so no need to walk
4093  * through subxacts again). In fact, we must not do that as we may be
4094  * using snapshot half-way through the subxact.
4095  */
4096  command_id = txn->command_id;
4097 
4098  /*
4099  * We can't use txn->snapshot_now directly because after the last
4100  * streaming run, we might have got some new sub-transactions. So we
4101  * need to add them to the snapshot.
4102  */
4103  snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now,
4104  txn, command_id);
4105 
4106  /* Free the previously copied snapshot. */
4107  Assert(txn->snapshot_now->copied);
4109  txn->snapshot_now = NULL;
4110  }
4111 
4112  /*
4113  * Remember this information to be used later to update stats. We can't
4114  * update the stats here as an error while processing the changes would
4115  * lead to the accumulation of stats even though we haven't streamed all
4116  * the changes.
4117  */
4118  txn_is_streamed = rbtxn_is_streamed(txn);
4119  stream_bytes = txn->total_size;
4120 
4121  /* Process and send the changes to output plugin. */
4122  ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
4123  command_id, true);
4124 
4125  rb->streamCount += 1;
4126  rb->streamBytes += stream_bytes;
4127 
4128  /* Don't consider already streamed transaction. */
4129  rb->streamTxns += (txn_is_streamed) ? 0 : 1;
4130 
4131  /* update the decoding stats */
4133 
4134  Assert(dlist_is_empty(&txn->changes));
4135  Assert(txn->nentries == 0);
4136  Assert(txn->nentries_mem == 0);
4137 }

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::changes, ReorderBufferTXN::command_id, SnapshotData::copied, dlist_iter::cur, dlist_container, dlist_foreach, dlist_is_empty(), FirstCommandId, InvalidCommandId, InvalidXLogRecPtr, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferTXN::ninvalidations, ReorderBuffer::private_data, rbtxn_is_streamed, rbtxn_is_toptxn, ReorderBufferCopySnap(), ReorderBufferFreeSnap(), ReorderBufferProcessTXN(), ReorderBufferTransferSnapToParent(), ReorderBufferTXN::snapshot_now, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBufferTXN::subtxns, ReorderBufferTXN::total_size, and UpdateDecodingStats().

Referenced by ReorderBufferCheckMemoryLimit(), ReorderBufferProcessPartialChange(), and ReorderBufferStreamCommit().

◆ ReorderBufferToastAppendChunk()

static void ReorderBufferToastAppendChunk ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 4679 of file reorderbuffer.c.

4681 {
4682  ReorderBufferToastEnt *ent;
4683  HeapTuple newtup;
4684  bool found;
4685  int32 chunksize;
4686  bool isnull;
4687  Pointer chunk;
4688  TupleDesc desc = RelationGetDescr(relation);
4689  Oid chunk_id;
4690  int32 chunk_seq;
4691 
4692  if (txn->toast_hash == NULL)
4693  ReorderBufferToastInitHash(rb, txn);
4694 
4695  Assert(IsToastRelation(relation));
4696 
4697  newtup = change->data.tp.newtuple;
4698  chunk_id = DatumGetObjectId(fastgetattr(newtup, 1, desc, &isnull));
4699  Assert(!isnull);
4700  chunk_seq = DatumGetInt32(fastgetattr(newtup, 2, desc, &isnull));
4701  Assert(!isnull);
4702 
4703  ent = (ReorderBufferToastEnt *)
4704  hash_search(txn->toast_hash, &chunk_id, HASH_ENTER, &found);
4705 
4706  if (!found)
4707  {
4708  Assert(ent->chunk_id == chunk_id);
4709  ent->num_chunks = 0;
4710  ent->last_chunk_seq = 0;
4711  ent->size = 0;
4712  ent->reconstructed = NULL;
4713  dlist_init(&ent->chunks);
4714 
4715  if (chunk_seq != 0)
4716  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
4717  chunk_seq, chunk_id);
4718  }
4719  else if (found && chunk_seq != ent->last_chunk_seq + 1)
4720  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
4721  chunk_seq, chunk_id, ent->last_chunk_seq + 1);
4722 
4723  chunk = DatumGetPointer(fastgetattr(newtup, 3, desc, &isnull));
4724  Assert(!isnull);
4725 
4726  /* calculate size so we can allocate the right size at once later */
4727  if (!VARATT_IS_EXTENDED(chunk))
4728  chunksize = VARSIZE(chunk) - VARHDRSZ;
4729  else if (VARATT_IS_SHORT(chunk))
4730  /* could happen due to heap_form_tuple doing its thing */
4731  chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
4732  else
4733  elog(ERROR, "unexpected type of toast chunk");
4734 
4735  ent->size += chunksize;
4736  ent->last_chunk_seq = chunk_seq;
4737  ent->num_chunks++;
4738  dlist_push_tail(&ent->chunks, &change->node);
4739 }
char * Pointer
Definition: c.h:483
#define VARHDRSZ
Definition: c.h:692
uint64 chunk
static Datum fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)
Definition: htup_details.h:749
static Oid DatumGetObjectId(Datum X)
Definition: postgres.h:242
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:312
#define RelationGetDescr(relation)
Definition: rel.h:531
static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
struct varlena * reconstructed
#define VARHDRSZ_SHORT
Definition: varatt.h:255
#define VARSIZE_SHORT(PTR)
Definition: varatt.h:281
#define VARATT_IS_EXTENDED(PTR)
Definition: varatt.h:303
#define VARATT_IS_SHORT(PTR)
Definition: varatt.h:302
#define VARSIZE(PTR)
Definition: varatt.h:279

References Assert, chunk, ReorderBufferToastEnt::chunk_id, ReorderBufferToastEnt::chunks, ReorderBufferChange::data, DatumGetInt32(), DatumGetObjectId(), DatumGetPointer(), dlist_init(), dlist_push_tail(), elog, ERROR, fastgetattr(), HASH_ENTER, hash_search(), IsToastRelation(), ReorderBufferToastEnt::last_chunk_seq, ReorderBufferChange::node, ReorderBufferToastEnt::num_chunks, ReorderBufferToastEnt::reconstructed, RelationGetDescr, ReorderBufferToastInitHash(), ReorderBufferToastEnt::size, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, VARATT_IS_EXTENDED, VARATT_IS_SHORT, VARHDRSZ, VARHDRSZ_SHORT, VARSIZE, and VARSIZE_SHORT.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferToastInitHash()

static void ReorderBufferToastInitHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 4659 of file reorderbuffer.c.

4660 {
4661  HASHCTL hash_ctl;
4662 
4663  Assert(txn->toast_hash == NULL);
4664 
4665  hash_ctl.keysize = sizeof(Oid);
4666  hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
4667  hash_ctl.hcxt = rb->context;
4668  txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
4670 }
struct ReorderBufferToastEnt ReorderBufferToastEnt

References Assert, ReorderBuffer::context, HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, and ReorderBufferTXN::toast_hash.

Referenced by ReorderBufferToastAppendChunk().

◆ ReorderBufferToastReplace()

static void ReorderBufferToastReplace ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 4762 of file reorderbuffer.c.

4764 {
4765  TupleDesc desc;
4766  int natt;
4767  Datum *attrs;
4768  bool *isnull;
4769  bool *free;
4770  HeapTuple tmphtup;
4771  Relation toast_rel;
4772  TupleDesc toast_desc;
4773  MemoryContext oldcontext;
4774  HeapTuple newtup;
4775  Size old_size;
4776 
4777  /* no toast tuples changed */
4778  if (txn->toast_hash == NULL)
4779  return;
4780 
4781  /*
4782  * We're going to modify the size of the change. So, to make sure the
4783  * accounting is correct we record the current change size and then after
4784  * re-computing the change we'll subtract the recorded size and then
4785  * re-add the new change size at the end. We don't immediately subtract
4786  * the old size because if there is any error before we add the new size,
4787  * we will release the changes and that will update the accounting info
4788  * (subtracting the size from the counters). And we don't want to
4789  * underflow there.
4790  */
4791  old_size = ReorderBufferChangeSize(change);
4792 
4793  oldcontext = MemoryContextSwitchTo(rb->context);
4794 
4795  /* we should only have toast tuples in an INSERT or UPDATE */
4796  Assert(change->data.tp.newtuple);
4797 
4798  desc = RelationGetDescr(relation);
4799 
4800  toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
4801  if (!RelationIsValid(toast_rel))
4802  elog(ERROR, "could not open toast relation with OID %u (base relation \"%s\")",
4803  relation->rd_rel->reltoastrelid, RelationGetRelationName(relation));
4804 
4805  toast_desc = RelationGetDescr(toast_rel);
4806 
4807  /* should we allocate from stack instead? */
4808  attrs = palloc0(sizeof(Datum) * desc->natts);
4809  isnull = palloc0(sizeof(bool) * desc->natts);
4810  free = palloc0(sizeof(bool) * desc->natts);
4811 
4812  newtup = change->data.tp.newtuple;
4813 
4814  heap_deform_tuple(newtup, desc, attrs, isnull);
4815 
4816  for (natt = 0; natt < desc->natts; natt++)
4817  {
4818  Form_pg_attribute attr = TupleDescAttr(desc, natt);
4819  ReorderBufferToastEnt *ent;
4820  struct varlena *varlena;
4821 
4822  /* va_rawsize is the size of the original datum -- including header */
4823  struct varatt_external toast_pointer;
4824  struct varatt_indirect redirect_pointer;
4825  struct varlena *new_datum = NULL;
4826  struct varlena *reconstructed;
4827  dlist_iter it;
4828  Size data_done = 0;
4829 
4830  /* system columns aren't toasted */
4831  if (attr->attnum < 0)
4832  continue;
4833 
4834  if (attr->attisdropped)
4835  continue;
4836 
4837  /* not a varlena datatype */
4838  if (attr->attlen != -1)
4839  continue;
4840 
4841  /* no data */
4842  if (isnull[natt])
4843  continue;
4844 
4845  /* ok, we know we have a toast datum */
4846  varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
4847 
4848  /* no need to do anything if the tuple isn't external */
4850  continue;
4851 
4852  VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
4853 
4854  /*
4855  * Check whether the toast tuple changed, replace if so.
4856  */
4857  ent = (ReorderBufferToastEnt *)
4858  hash_search(txn->toast_hash,
4859  &toast_pointer.va_valueid,
4860  HASH_FIND,
4861  NULL);
4862  if (ent == NULL)
4863  continue;
4864 
4865  new_datum =
4867 
4868  free[natt] = true;
4869 
4870  reconstructed = palloc0(toast_pointer.va_rawsize);
4871 
4872  ent->reconstructed = reconstructed;
4873 
4874  /* stitch toast tuple back together from its parts */
4875  dlist_foreach(it, &ent->chunks)
4876  {
4877  bool cisnull;
4878  ReorderBufferChange *cchange;
4879  HeapTuple ctup;
4880  Pointer chunk;
4881 
4882  cchange = dlist_container(ReorderBufferChange, node, it.cur);
4883  ctup = cchange->data.tp.newtuple;
4884  chunk = DatumGetPointer(fastgetattr(ctup, 3, toast_desc, &cisnull));
4885 
4886  Assert(!cisnull);
4889 
4890  memcpy(VARDATA(reconstructed) + data_done,
4891  VARDATA(chunk),
4892  VARSIZE(chunk) - VARHDRSZ);
4893  data_done += VARSIZE(chunk) - VARHDRSZ;
4894  }
4895  Assert(data_done == VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer));
4896 
4897  /* make sure its marked as compressed or not */
4898  if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
4899  SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
4900  else
4901  SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
4902 
4903  memset(&redirect_pointer, 0, sizeof(redirect_pointer));
4904  redirect_pointer.pointer = reconstructed;
4905 
4907  memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
4908  sizeof(redirect_pointer));
4909 
4910  attrs[natt] = PointerGetDatum(new_datum);
4911  }
4912 
4913  /*
4914  * Build tuple in separate memory & copy tuple back into the tuplebuf
4915  * passed to the output plugin. We can't directly heap_fill_tuple() into
4916  * the tuplebuf because attrs[] will point back into the current content.
4917  */
4918  tmphtup = heap_form_tuple(desc, attrs, isnull);
4919  Assert(newtup->t_len <= MaxHeapTupleSize);
4920  Assert(newtup->t_data == (HeapTupleHeader) ((char *) newtup + HEAPTUPLESIZE));
4921 
4922  memcpy(newtup->t_data, tmphtup->t_data, tmphtup->t_len);
4923  newtup->t_len = tmphtup->t_len;
4924 
4925  /*
4926  * free resources we won't further need, more persistent stuff will be
4927  * free'd in ReorderBufferToastReset().
4928  */
4929  RelationClose(toast_rel);
4930  pfree(tmphtup);
4931  for (natt = 0; natt < desc->natts; natt++)
4932  {
4933  if (free[natt])
4934  pfree(DatumGetPointer(attrs[natt]));
4935  }
4936  pfree(attrs);
4937  pfree(free);
4938  pfree(isnull);
4939 
4940  MemoryContextSwitchTo(oldcontext);
4941 
4942  /* subtract the old change size */
4943  ReorderBufferChangeMemoryUpdate(rb, change, NULL, false, old_size);
4944  /* now add the change back, with the correct size */
4945  ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
4946  ReorderBufferChangeSize(change));
4947 }
#define INDIRECT_POINTER_SIZE
Definition: detoast.h:34
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
Definition: detoast.h:22
#define free(a)
Definition: header.h:65
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1116
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:1345
#define MaxHeapTupleSize
Definition: htup_details.h:558
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:209
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
uintptr_t Datum
Definition: postgres.h:64
#define RelationGetRelationName(relation)
Definition: rel.h:539
Definition: c.h:687
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define SET_VARSIZE_COMPRESSED(PTR, len)
Definition: varatt.h:307
#define VARDATA(PTR)
Definition: varatt.h:278
#define SET_VARTAG_EXTERNAL(PTR, tag)
Definition: varatt.h:309
#define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)
Definition: varatt.h:354
#define VARDATA_EXTERNAL(PTR)
Definition: varatt.h:286
#define SET_VARSIZE(PTR, len)
Definition: varatt.h:305
#define VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer)
Definition: varatt.h:334
#define VARATT_IS_EXTERNAL(PTR)
Definition: varatt.h:289
@ VARTAG_INDIRECT
Definition: varatt.h:86

References Assert, chunk, ReorderBufferToastEnt::chunks, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, DatumGetPointer(), dlist_container, dlist_foreach, elog, ERROR, fastgetattr(), free, HASH_FIND, hash_search(), heap_deform_tuple(), heap_form_tuple(), HEAPTUPLESIZE, INDIRECT_POINTER_SIZE, MaxHeapTupleSize, MemoryContextSwitchTo(), TupleDescData::natts, palloc0(), pfree(), varatt_indirect::pointer, PointerGetDatum(), RelationData::rd_rel, ReorderBufferToastEnt::reconstructed, RelationClose(), RelationGetDescr, RelationGetRelationName, RelationIdGetRelation(), RelationIsValid, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), SET_VARSIZE, SET_VARSIZE_COMPRESSED, SET_VARTAG_EXTERNAL, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, TupleDescAttr, varatt_external::va_rawsize, varatt_external::va_valueid, VARATT_EXTERNAL_GET_EXTSIZE, VARATT_EXTERNAL_GET_POINTER, VARATT_EXTERNAL_IS_COMPRESSED, VARATT_IS_EXTERNAL, VARATT_IS_SHORT, VARDATA, VARDATA_EXTERNAL, VARHDRSZ, VARSIZE, and VARTAG_INDIRECT.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferToastReset()

static void ReorderBufferToastReset ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 4953 of file reorderbuffer.c.

4954 {
4955  HASH_SEQ_STATUS hstat;
4956  ReorderBufferToastEnt *ent;
4957 
4958  if (txn->toast_hash == NULL)
4959  return;
4960 
4961  /* sequentially walk over the hash and free everything */
4962  hash_seq_init(&hstat, txn->toast_hash);
4963  while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
4964  {
4965  dlist_mutable_iter it;
4966 
4967  if (ent->reconstructed != NULL)
4968  pfree(ent->reconstructed);
4969 
4970  dlist_foreach_modify(it, &ent->chunks)
4971  {
4972  ReorderBufferChange *change =
4974 
4975  dlist_delete(&change->node);
4976  ReorderBufferReturnChange(rb, change, true);
4977  }
4978  }
4979 
4980  hash_destroy(txn->toast_hash);
4981  txn->toast_hash = NULL;
4982 }
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1395
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1385

References ReorderBufferToastEnt::chunks, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, hash_destroy(), hash_seq_init(), hash_seq_search(), ReorderBufferChange::node, pfree(), ReorderBufferToastEnt::reconstructed, ReorderBufferReturnChange(), and ReorderBufferTXN::toast_hash.

Referenced by ReorderBufferProcessTXN(), ReorderBufferResetTXN(), and ReorderBufferReturnTXN().

◆ ReorderBufferTransferSnapToParent()

static void ReorderBufferTransferSnapToParent ( ReorderBufferTXN txn,
ReorderBufferTXN subtxn 
)
static

Definition at line 1135 of file reorderbuffer.c.

1137 {
1138  Assert(subtxn->toplevel_xid == txn->xid);
1139 
1140  if (subtxn->base_snapshot != NULL)
1141  {
1142  if (txn->base_snapshot == NULL ||
1143  subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
1144  {
1145  /*
1146  * If the toplevel transaction already has a base snapshot but
1147  * it's newer than the subxact's, purge it.
1148  */
1149  if (txn->base_snapshot != NULL)
1150  {
1153  }
1154 
1155  /*
1156  * The snapshot is now the top transaction's; transfer it, and
1157  * adjust the list position of the top transaction in the list by
1158  * moving it to where the subtransaction is.
1159  */
1160  txn->base_snapshot = subtxn->base_snapshot;
1161  txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
1163  &txn->base_snapshot_node);
1164 
1165  /*
1166  * The subtransaction doesn't have a snapshot anymore (so it
1167  * mustn't be in the list.)
1168  */
1169  subtxn->base_snapshot = NULL;
1171  dlist_delete(&subtxn->base_snapshot_node);
1172  }
1173  else
1174  {
1175  /* Base snap of toplevel is fine, so subxact's is not needed */
1177  dlist_delete(&subtxn->base_snapshot_node);
1178  subtxn->base_snapshot = NULL;
1180  }
1181  }
1182 }
static void dlist_insert_before(dlist_node *before, dlist_node *node)
Definition: ilist.h:393

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBufferTXN::base_snapshot_node, dlist_delete(), dlist_insert_before(), InvalidXLogRecPtr, SnapBuildSnapDecRefcount(), ReorderBufferTXN::toplevel_xid, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAssignChild(), and ReorderBufferStreamTXN().

◆ ReorderBufferTruncateTXN()

static void ReorderBufferTruncateTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
bool  txn_prepared 
)
static

Definition at line 1616 of file reorderbuffer.c.

1617 {
1618  dlist_mutable_iter iter;
1619 
1620  /* cleanup subtransactions & their changes */
1621  dlist_foreach_modify(iter, &txn->subtxns)
1622  {
1623  ReorderBufferTXN *subtxn;
1624 
1625  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1626 
1627  /*
1628  * Subtransactions are always associated to the toplevel TXN, even if
1629  * they originally were happening inside another subtxn, so we won't
1630  * ever recurse more than one level deep here.
1631  */
1632  Assert(rbtxn_is_known_subxact(subtxn));
1633  Assert(subtxn->nsubtxns == 0);
1634 
1635  ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
1636  }
1637 
1638  /* cleanup changes in the txn */
1639  dlist_foreach_modify(iter, &txn->changes)
1640  {
1641  ReorderBufferChange *change;
1642 
1643  change = dlist_container(ReorderBufferChange, node, iter.cur);
1644 
1645  /* Check we're not mixing changes from different transactions. */
1646  Assert(change->txn == txn);
1647 
1648  /* remove the change from it's containing list */
1649  dlist_delete(&change->node);
1650 
1651  ReorderBufferReturnChange(rb, change, false);
1652  }
1653 
1654  /* Update the memory counter */
1655  ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
1656 
1657  /*
1658  * Mark the transaction as streamed.
1659  *
1660  * The top-level transaction, is marked as streamed always, even if it
1661  * does not contain any changes (that is, when all the changes are in
1662  * subtransactions).
1663  *
1664  * For subtransactions, we only mark them as streamed when there are
1665  * changes in them.
1666  *
1667  * We do it this way because of aborts - we don't want to send aborts for
1668  * XIDs the downstream is not aware of. And of course, it always knows
1669  * about the toplevel xact (we send the XID in all messages), but we never
1670  * stream XIDs of empty subxacts.
1671  */
1672  if ((!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0)))
1673  txn->txn_flags |= RBTXN_IS_STREAMED;
1674 
1675  if (txn_prepared)
1676  {
1677  /*
1678  * If this is a prepared txn, cleanup the tuplecids we stored for
1679  * decoding catalog snapshot access. They are always stored in the
1680  * toplevel transaction.
1681  */
1682  dlist_foreach_modify(iter, &txn->tuplecids)
1683  {
1684  ReorderBufferChange *change;
1685 
1686  change = dlist_container(ReorderBufferChange, node, iter.cur);
1687 
1688  /* Check we're not mixing changes from different transactions. */
1689  Assert(change->txn == txn);
1691 
1692  /* Remove the change from its containing list. */
1693  dlist_delete(&change->node);
1694 
1695  ReorderBufferReturnChange(rb, change, true);
1696  }
1697  }
1698 
1699  /*
1700  * Destroy the (relfilelocator, ctid) hashtable, so that we don't leak any
1701  * memory. We could also keep the hash table and update it with new ctid
1702  * values, but this seems simpler and good enough for now.
1703  */
1704  if (txn->tuplecid_hash != NULL)
1705  {
1707  txn->tuplecid_hash = NULL;
1708  }
1709 
1710  /* If this txn is serialized then clean the disk space. */
1711  if (rbtxn_is_serialized(txn))
1712  {
1713  ReorderBufferRestoreCleanup(rb, txn);
1714  txn->txn_flags &= ~RBTXN_IS_SERIALIZED;
1715 
1716  /*
1717  * We set this flag to indicate if the transaction is ever serialized.
1718  * We need this to accurately update the stats as otherwise the same
1719  * transaction can be counted as serialized multiple times.
1720  */
1722  }
1723 
1724  /* also reset the number of entries in the transaction */
1725  txn->nentries_mem = 0;
1726  txn->nentries = 0;
1727 }
#define RBTXN_IS_STREAMED
#define RBTXN_IS_SERIALIZED_CLEAR

References ReorderBufferChange::action, Assert, ReorderBufferTXN::changes, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, hash_destroy(), ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SERIALIZED, rbtxn_is_serialized, RBTXN_IS_SERIALIZED_CLEAR, RBTXN_IS_STREAMED, rbtxn_is_toptxn, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferChangeMemoryUpdate(), ReorderBufferRestoreCleanup(), ReorderBufferReturnChange(), ReorderBufferTXN::size, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecid_hash, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXN::txn_flags.

Referenced by ReorderBufferProcessTXN(), ReorderBufferResetTXN(), and ReorderBufferStreamCommit().

◆ ReorderBufferTXNByXid()

static ReorderBufferTXN * ReorderBufferTXNByXid ( ReorderBuffer rb,
TransactionId  xid,
bool  create,
bool is_new,
XLogRecPtr  lsn,
bool  create_as_top 
)
static

Definition at line 623 of file reorderbuffer.c.

625 {
626  ReorderBufferTXN *txn;
628  bool found;
629 
631 
632  /*
633  * Check the one-entry lookup cache first
634  */
636  rb->by_txn_last_xid == xid)
637  {
638  txn = rb->by_txn_last_txn;
639 
640  if (txn != NULL)
641  {
642  /* found it, and it's valid */
643  if (is_new)
644  *is_new = false;
645  return txn;
646  }
647 
648  /*
649  * cached as non-existent, and asked not to create? Then nothing else
650  * to do.
651  */
652  if (!create)
653  return NULL;
654  /* otherwise fall through to create it */
655  }
656 
657  /*
658  * If the cache wasn't hit or it yielded a "does-not-exist" and we want to
659  * create an entry.
660  */
661 
662  /* search the lookup table */
663  ent = (ReorderBufferTXNByIdEnt *)
664  hash_search(rb->by_txn,
665  &xid,
666  create ? HASH_ENTER : HASH_FIND,
667  &found);
668  if (found)
669  txn = ent->txn;
670  else if (create)
671  {
672  /* initialize the new entry, if creation was requested */
673  Assert(ent != NULL);
674  Assert(lsn != InvalidXLogRecPtr);
675 
676  ent->txn = ReorderBufferGetTXN(rb);
677  ent->txn->xid = xid;
678  txn = ent->txn;
679  txn->first_lsn = lsn;
681 
682  if (create_as_top)
683  {
684  dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
685  AssertTXNLsnOrder(rb);
686  }
687  }
688  else
689  txn = NULL; /* not found and not asked to create */
690 
691  /* update cache */
692  rb->by_txn_last_xid = xid;
693  rb->by_txn_last_txn = txn;
694 
695  if (is_new)
696  *is_new = !found;
697 
698  Assert(!create || txn != NULL);
699  return txn;
700 }
static ReorderBufferTXN * ReorderBufferGetTXN(ReorderBuffer *rb)
ReorderBufferTXN * txn
XLogRecPtr restart_decoding_lsn
#define TransactionIdIsValid(xid)
Definition: transam.h:41

References Assert, AssertTXNLsnOrder(), ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::current_restart_decoding_lsn, dlist_push_tail(), ReorderBufferTXN::first_lsn, HASH_ENTER, HASH_FIND, hash_search(), InvalidXLogRecPtr, ReorderBufferTXN::node, ReorderBufferGetTXN(), ReorderBufferTXN::restart_decoding_lsn, ReorderBuffer::toplevel_by_lsn, TransactionIdIsValid, ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAddInvalidations(), ReorderBufferAddNewTupleCids(), ReorderBufferAssignChild(), ReorderBufferCommit(), ReorderBufferCommitChild(), ReorderBufferFinishPrepared(), ReorderBufferForget(), ReorderBufferInvalidate(), ReorderBufferPrepare(), ReorderBufferProcessXid(), ReorderBufferQueueChange(), ReorderBufferQueueMessage(), ReorderBufferRememberPrepareInfo(), ReorderBufferSetBaseSnapshot(), ReorderBufferSkipPrepare(), ReorderBufferXidHasBaseSnapshot(), ReorderBufferXidHasCatalogChanges(), and ReorderBufferXidSetCatalogChanges().

◆ ReorderBufferTXNSizeCompare()

static int ReorderBufferTXNSizeCompare ( const pairingheap_node a,
const pairingheap_node b,
void *  arg 
)
static

Definition at line 3508 of file reorderbuffer.c.

3509 {
3512 
3513  if (ta->size < tb->size)
3514  return -1;
3515  if (ta->size > tb->size)
3516  return 1;
3517  return 0;
3518 }
#define pairingheap_const_container(type, membername, ptr)
Definition: pairingheap.h:51

References a, b, pairingheap_const_container, and ReorderBufferTXN::size.

Referenced by ReorderBufferAllocate().

◆ ReorderBufferXidHasBaseSnapshot()

bool ReorderBufferXidHasBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 3461 of file reorderbuffer.c.

3462 {
3463  ReorderBufferTXN *txn;
3464 
3465  txn = ReorderBufferTXNByXid(rb, xid, false,
3466  NULL, InvalidXLogRecPtr, false);
3467 
3468  /* transaction isn't known yet, ergo no snapshot */
3469  if (txn == NULL)
3470  return false;
3471 
3472  /* a known subtxn? operate on top-level txn instead */
3473  if (rbtxn_is_known_subxact(txn))
3474  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3475  NULL, InvalidXLogRecPtr, false);
3476 
3477  return txn->base_snapshot != NULL;
3478 }

References ReorderBufferTXN::base_snapshot, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), and ReorderBufferTXN::toplevel_xid.

Referenced by SnapBuildCommitTxn(), SnapBuildDistributeNewCatalogSnapshot(), and SnapBuildProcessChange().

◆ ReorderBufferXidHasCatalogChanges()

bool ReorderBufferXidHasCatalogChanges ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 3444 of file reorderbuffer.c.

3445 {
3446  ReorderBufferTXN *txn;
3447 
3448  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3449  false);
3450  if (txn == NULL)
3451  return false;
3452 
3453  return rbtxn_has_catalog_changes(txn);
3454 }

References InvalidXLogRecPtr, rbtxn_has_catalog_changes, and ReorderBufferTXNByXid().

Referenced by SnapBuildXidHasCatalogChanges().

◆ ReorderBufferXidSetCatalogChanges()

void ReorderBufferXidSetCatalogChanges ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3371 of file reorderbuffer.c.

3373 {
3374  ReorderBufferTXN *txn;
3375 
3376  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3377 
3378  if (!rbtxn_has_catalog_changes(txn))
3379  {
3382  }
3383 
3384  /*
3385  * Mark top-level transaction as having catalog changes too if one of its
3386  * children has so that the ReorderBufferBuildTupleCidHash can
3387  * conveniently check just top-level transaction and decide whether to
3388  * build the hash table or not.
3389  */
3390  if (rbtxn_is_subtxn(txn))
3391  {
3392  ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
3393 
3394  if (!rbtxn_has_catalog_changes(toptxn))
3395  {
3398  }
3399  }
3400 }
static void dclist_push_tail(dclist_head *head, dlist_node *node)
Definition: ilist.h:709
#define rbtxn_is_subtxn(txn)
#define RBTXN_HAS_CATALOG_CHANGES

References ReorderBufferTXN::catchange_node, ReorderBuffer::catchange_txns, dclist_push_tail(), rbtxn_get_toptxn, RBTXN_HAS_CATALOG_CHANGES, rbtxn_has_catalog_changes, rbtxn_is_subtxn, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by heap_decode(), SnapBuildProcessNewCid(), and xact_decode().

◆ ResolveCminCmaxDuringDecoding()

bool ResolveCminCmaxDuringDecoding ( HTAB tuplecid_data,
Snapshot  snapshot,
HeapTuple  htup,
Buffer  buffer,
CommandId cmin,
CommandId cmax 
)

Definition at line 5245 of file reorderbuffer.c.

5249 {
5252  ForkNumber forkno;
5253  BlockNumber blockno;
5254  bool updated_mapping = false;
5255 
5256  /*
5257  * Return unresolved if tuplecid_data is not valid. That's because when
5258  * streaming in-progress transactions we may run into tuples with the CID
5259  * before actually decoding them. Think e.g. about INSERT followed by
5260  * TRUNCATE, where the TRUNCATE may not be decoded yet when applying the
5261  * INSERT. So in such cases, we assume the CID is from the future
5262  * command.
5263  */
5264  if (tuplecid_data == NULL)
5265  return false;
5266 
5267  /* be careful about padding */
5268  memset(&key, 0, sizeof(key));
5269 
5270  Assert(!BufferIsLocal(buffer));
5271 
5272  /*
5273  * get relfilelocator from the buffer, no convenient way to access it
5274  * other than that.
5275  */
5276  BufferGetTag(buffer, &key.rlocator, &forkno, &blockno);
5277 
5278  /* tuples can only be in the main fork */
5279  Assert(forkno == MAIN_FORKNUM);
5280  Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
5281 
5282  ItemPointerCopy(&htup->t_self,
5283  &key.tid);
5284 
5285 restart:
5286  ent = (ReorderBufferTupleCidEnt *)
5288 
5289  /*
5290  * failed to find a mapping, check whether the table was rewritten and
5291  * apply mapping if so, but only do that once - there can be no new
5292  * mappings while we are in here since we have to hold a lock on the
5293  * relation.
5294  */
5295  if (ent == NULL && !updated_mapping)
5296  {
5298  /* now check but don't update for a mapping again */
5299  updated_mapping = true;
5300  goto restart;
5301  }
5302  else if (ent == NULL)
5303  return false;
5304 
5305  if (cmin)
5306  *cmin = ent->cmin;
5307  if (cmax)
5308  *cmax = ent->cmax;
5309  return true;
5310 }
uint32 BlockNumber
Definition: block.h:31
#define BufferIsLocal(buffer)
Definition: buf.h:37
void BufferGetTag(Buffer buffer, RelFileLocator *rlocator, ForkNumber *forknum, BlockNumber *blknum)
Definition: bufmgr.c:3688
static BlockNumber ItemPointerGetBlockNumber(const ItemPointerData *pointer)
Definition: itemptr.h:103
ForkNumber
Definition: relpath.h:48
static void UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
ItemPointerData t_self
Definition: htup.h:65
Oid t_tableOid
Definition: htup.h:66

References Assert, BufferGetTag(), BufferIsLocal, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, HASH_FIND, hash_search(), ItemPointerCopy(), ItemPointerGetBlockNumber(), sort-test::key, MAIN_FORKNUM, HeapTupleData::t_self, HeapTupleData::t_tableOid, tuplecid_data, and UpdateLogicalMappings().

Referenced by HeapTupleSatisfiesHistoricMVCC().

◆ SetupCheckXidLive()

static void SetupCheckXidLive ( TransactionId  xid)
inlinestatic

Definition at line 1945 of file reorderbuffer.c.

1946 {
1947  /*
1948  * If the input transaction id is already set as a CheckXidAlive then
1949  * nothing to do.
1950  */
1952  return;
1953 
1954  /*
1955  * setup CheckXidAlive if it's not committed yet. We don't check if the
1956  * xid is aborted. That will happen during catalog access.
1957  */
1958  if (!TransactionIdDidCommit(xid))
1959  CheckXidAlive = xid;
1960  else
1962 }
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:126
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43

References CheckXidAlive, InvalidTransactionId, TransactionIdDidCommit(), and TransactionIdEquals.

Referenced by ReorderBufferProcessTXN().

◆ StartupReorderBuffer()

void StartupReorderBuffer ( void  )

Definition at line 4625 of file reorderbuffer.c.

4626 {
4627  DIR *logical_dir;
4628  struct dirent *logical_de;
4629 
4630  logical_dir = AllocateDir("pg_replslot");
4631  while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
4632  {
4633  if (strcmp(logical_de->d_name, ".") == 0 ||
4634  strcmp(logical_de->d_name, "..") == 0)
4635  continue;
4636 
4637  /* if it cannot be a slot, skip the directory */
4638  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
4639  continue;
4640 
4641  /*
4642  * ok, has to be a surviving logical slot, iterate and delete
4643  * everything starting with xid-*
4644  */
4646  }
4647  FreeDir(logical_dir);
4648 }
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2909
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:252

References AllocateDir(), dirent::d_name, DEBUG2, FreeDir(), ReadDir(), ReorderBufferCleanupSerializedTXNs(), and ReplicationSlotValidateName().

Referenced by StartupXLOG().

◆ TransactionIdInArray()

static bool TransactionIdInArray ( TransactionId  xid,
TransactionId xip,
Size  num 
)
static

Definition at line 5144 of file reorderbuffer.c.

5145 {
5146  return bsearch(&xid, xip, num,
5147  sizeof(TransactionId), xidComparator) != NULL;
5148 }

References xidComparator().

Referenced by UpdateLogicalMappings().

◆ UpdateLogicalMappings()

static void UpdateLogicalMappings ( HTAB tuplecid_data,
Oid  relid,
Snapshot  snapshot 
)
static

Definition at line 5167 of file reorderbuffer.c.

5168 {
5169  DIR *mapping_dir;
5170  struct dirent *mapping_de;
5171  List *files = NIL;
5172  ListCell *file;
5173  Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
5174 
5175  mapping_dir = AllocateDir("pg_logical/mappings");
5176  while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
5177  {
5178  Oid f_dboid;
5179  Oid f_relid;
5180  TransactionId f_mapped_xid;
5181  TransactionId f_create_xid;
5182  XLogRecPtr f_lsn;
5183  uint32 f_hi,
5184  f_lo;
5185  RewriteMappingFile *f;
5186 
5187  if (strcmp(mapping_de->d_name, ".") == 0 ||
5188  strcmp(mapping_de->d_name, "..") == 0)
5189  continue;
5190 
5191  /* Ignore files that aren't ours */
5192  if (strncmp(mapping_de->d_name, "map-", 4) != 0)
5193  continue;
5194 
5195  if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
5196  &f_dboid, &f_relid, &f_hi, &f_lo,
5197  &f_mapped_xid, &f_create_xid) != 6)
5198  elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
5199 
5200  f_lsn = ((uint64) f_hi) << 32 | f_lo;
5201 
5202  /* mapping for another database */
5203  if (f_dboid != dboid)
5204  continue;
5205 
5206  /* mapping for another relation */
5207  if (f_relid != relid)
5208  continue;
5209 
5210  /* did the creating transaction abort? */
5211  if (!TransactionIdDidCommit(f_create_xid))
5212  continue;
5213 
5214  /* not for our transaction */
5215  if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
5216  continue;
5217 
5218  /* ok, relevant, queue for apply */
5219  f = palloc(sizeof(RewriteMappingFile));
5220  f->lsn = f_lsn;
5221  strcpy(f->fname, mapping_de->d_name);
5222  files = lappend(files, f);
5223  }
5224  FreeDir(mapping_dir);
5225 
5226  /* sort files so we apply them in LSN order */
5227  list_sort(files, file_sort_by_lsn);
5228 
5229  foreach(file, files)
5230  {
5232 
5233  elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
5234  snapshot->subxip[0]);
5236  pfree(f);
5237  }
5238 }
bool IsSharedRelation(Oid relationId)
Definition: catalog.c:243
#define DEBUG1
Definition: elog.h:30
Oid MyDatabaseId
Definition: globals.c:91
void list_sort(List *list, list_sort_comparator cmp)
Definition: list.c:1674
List * lappend(List *list, void *datum)
Definition: list.c:339
#define NIL
Definition: pg_list.h:68
static int file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
static void ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
static bool TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
Definition: pg_list.h:54
char fname[MAXPGPATH]

References AllocateDir(), ApplyLogicalMappingFile(), dirent::d_name, DEBUG1, elog, ERROR, file_sort_by_lsn(), RewriteMappingFile::fname, FreeDir(), InvalidOid, IsSharedRelation(), lappend(), lfirst, list_sort(), LOGICAL_REWRITE_FORMAT, RewriteMappingFile::lsn, MyDatabaseId, NIL, palloc(), pfree(), ReadDir(), SnapshotData::subxcnt, SnapshotData::subxip, TransactionIdDidCommit(), TransactionIdInArray(), and tuplecid_data.

Referenced by ResolveCminCmaxDuringDecoding().

Variable Documentation

◆ debug_logical_replication_streaming

int debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING_BUFFERED

Definition at line 216 of file reorderbuffer.c.

Referenced by pa_send_data(), and ReorderBufferCheckMemoryLimit().

◆ logical_decoding_work_mem

int logical_decoding_work_mem

Definition at line 212 of file reorderbuffer.c.

Referenced by ReorderBufferCheckMemoryLimit().

◆ max_changes_in_memory

const Size max_changes_in_memory = 4096
static

Definition at line 213 of file reorderbuffer.c.

Referenced by ReorderBufferRestoreChanges().