PostgreSQL Source Code  git master
reorderbuffer.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/detoast.h"
#include "access/heapam.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "common/int.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenumbermap.h"
Include dependency graph for reorderbuffer.c:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTXNByIdEnt
 
struct  ReorderBufferTupleCidKey
 
struct  ReorderBufferTupleCidEnt
 
struct  TXNEntryFile
 
struct  ReorderBufferIterTXNEntry
 
struct  ReorderBufferIterTXNState
 
struct  ReorderBufferToastEnt
 
struct  ReorderBufferDiskChange
 
struct  RewriteMappingFile
 

Macros

#define IsSpecInsert(action)
 
#define IsSpecConfirmOrAbort(action)
 
#define IsInsertOrUpdate(action)
 
#define CHANGES_THRESHOLD   100
 

Typedefs

typedef struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
 
typedef struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
 
typedef struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
 
typedef struct TXNEntryFile TXNEntryFile
 
typedef struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
 
typedef struct ReorderBufferIterTXNState ReorderBufferIterTXNState
 
typedef struct ReorderBufferToastEnt ReorderBufferToastEnt
 
typedef struct ReorderBufferDiskChange ReorderBufferDiskChange
 
typedef struct RewriteMappingFile RewriteMappingFile
 

Functions

static ReorderBufferTXNReorderBufferGetTXN (ReorderBuffer *rb)
 
static void ReorderBufferReturnTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferTXNReorderBufferTXNByXid (ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
 
static void ReorderBufferTransferSnapToParent (ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
 
static void AssertTXNLsnOrder (ReorderBuffer *rb)
 
static void ReorderBufferIterTXNInit (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
 
static ReorderBufferChangeReorderBufferIterTXNNext (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferIterTXNFinish (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferExecuteInvalidations (uint32 nmsgs, SharedInvalidationMessage *msgs)
 
static void ReorderBufferCheckMemoryLimit (ReorderBuffer *rb)
 
static void ReorderBufferSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeChange (ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
 
static Size ReorderBufferRestoreChanges (ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
 
static void ReorderBufferRestoreChange (ReorderBuffer *rb, ReorderBufferTXN *txn, char *data)
 
static void ReorderBufferRestoreCleanup (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferTruncateTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
 
static void ReorderBufferCleanupSerializedTXNs (const char *slotname)
 
static void ReorderBufferSerializedPath (char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
 
static void ReorderBufferFreeSnap (ReorderBuffer *rb, Snapshot snap)
 
static Snapshot ReorderBufferCopySnap (ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
 
static bool ReorderBufferCanStream (ReorderBuffer *rb)
 
static bool ReorderBufferCanStartStreaming (ReorderBuffer *rb)
 
static void ReorderBufferStreamTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferStreamCommit (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastInitHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReset (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReplace (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void ReorderBufferToastAppendChunk (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static Size ReorderBufferChangeSize (ReorderBufferChange *change)
 
static void ReorderBufferChangeMemoryUpdate (ReorderBuffer *rb, ReorderBufferChange *change, bool addition, Size sz)
 
ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *rb)
 
void ReorderBufferReturnChange (ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
 
HeapTuple ReorderBufferGetTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (HeapTuple tuple)
 
OidReorderBufferGetRelids (ReorderBuffer *rb, int nrelids)
 
void ReorderBufferReturnRelids (ReorderBuffer *rb, Oid *relids)
 
static void ReorderBufferProcessPartialChange (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
static void AssertChangeLsnOrder (ReorderBufferTXN *txn)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
TransactionId ReorderBufferGetOldestXmin (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
static int ReorderBufferIterCompare (Datum a, Datum b, void *arg)
 
static void ReorderBufferCleanupTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferBuildTupleCidHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void SetupCheckXidLive (TransactionId xid)
 
static void ReorderBufferApplyChange (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyTruncate (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyMessage (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferSaveTXNSnapshot (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
 
static void ReorderBufferResetTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
 
static void ReorderBufferProcessTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)
 
static void ReorderBufferReplay (ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
bool ReorderBufferRememberPrepareInfo (ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferSkipPrepare (ReorderBuffer *rb, TransactionId xid)
 
void ReorderBufferPrepare (ReorderBuffer *rb, TransactionId xid, char *gid)
 
void ReorderBufferFinishPrepared (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, TimestampTz abort_time)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferInvalidate (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileLocator locator, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
TransactionIdReorderBufferGetCatalogChangesXacts (ReorderBuffer *rb)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
static void ReorderBufferSerializeReserve (ReorderBuffer *rb, Size sz)
 
static ReorderBufferTXNReorderBufferLargestTXN (ReorderBuffer *rb)
 
static ReorderBufferTXNReorderBufferLargestStreamableTopTXN (ReorderBuffer *rb)
 
void StartupReorderBuffer (void)
 
static void ApplyLogicalMappingFile (HTAB *tuplecid_data, Oid relid, const char *fname)
 
static bool TransactionIdInArray (TransactionId xid, TransactionId *xip, Size num)
 
static int file_sort_by_lsn (const ListCell *a_p, const ListCell *b_p)
 
static void UpdateLogicalMappings (HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
 
bool ResolveCminCmaxDuringDecoding (HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
 

Variables

int logical_decoding_work_mem
 
static const Size max_changes_in_memory = 4096
 
int debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING_BUFFERED
 

Macro Definition Documentation

◆ CHANGES_THRESHOLD

#define CHANGES_THRESHOLD   100

◆ IsInsertOrUpdate

#define IsInsertOrUpdate (   action)
Value:

Definition at line 189 of file reorderbuffer.c.

◆ IsSpecConfirmOrAbort

#define IsSpecConfirmOrAbort (   action)
Value:
( \
)
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
Definition: reorderbuffer.h:55
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT
Definition: reorderbuffer.h:56

Definition at line 184 of file reorderbuffer.c.

◆ IsSpecInsert

#define IsSpecInsert (   action)
Value:

Definition at line 180 of file reorderbuffer.c.

Typedef Documentation

◆ ReorderBufferDiskChange

◆ ReorderBufferIterTXNEntry

◆ ReorderBufferIterTXNState

◆ ReorderBufferToastEnt

◆ ReorderBufferTupleCidEnt

◆ ReorderBufferTupleCidKey

◆ ReorderBufferTXNByIdEnt

◆ RewriteMappingFile

◆ TXNEntryFile

typedef struct TXNEntryFile TXNEntryFile

Function Documentation

◆ ApplyLogicalMappingFile()

static void ApplyLogicalMappingFile ( HTAB tuplecid_data,
Oid  relid,
const char *  fname 
)
static

Definition at line 5014 of file reorderbuffer.c.

5015 {
5016  char path[MAXPGPATH];
5017  int fd;
5018  int readBytes;
5020 
5021  sprintf(path, "pg_logical/mappings/%s", fname);
5022  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
5023  if (fd < 0)
5024  ereport(ERROR,
5026  errmsg("could not open file \"%s\": %m", path)));
5027 
5028  while (true)
5029  {
5032  ReorderBufferTupleCidEnt *new_ent;
5033  bool found;
5034 
5035  /* be careful about padding */
5036  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
5037 
5038  /* read all mappings till the end of the file */
5039  pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ);
5040  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
5042 
5043  if (readBytes < 0)
5044  ereport(ERROR,
5046  errmsg("could not read file \"%s\": %m",
5047  path)));
5048  else if (readBytes == 0) /* EOF */
5049  break;
5050  else if (readBytes != sizeof(LogicalRewriteMappingData))
5051  ereport(ERROR,
5053  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
5054  path, readBytes,
5055  (int32) sizeof(LogicalRewriteMappingData))));
5056 
5057  key.rlocator = map.old_locator;
5058  ItemPointerCopy(&map.old_tid,
5059  &key.tid);
5060 
5061 
5062  ent = (ReorderBufferTupleCidEnt *)
5064 
5065  /* no existing mapping, no need to update */
5066  if (!ent)
5067  continue;
5068 
5069  key.rlocator = map.new_locator;
5070  ItemPointerCopy(&map.new_tid,
5071  &key.tid);
5072 
5073  new_ent = (ReorderBufferTupleCidEnt *)
5075 
5076  if (found)
5077  {
5078  /*
5079  * Make sure the existing mapping makes sense. We sometime update
5080  * old records that did not yet have a cmax (e.g. pg_class' own
5081  * entry while rewriting it) during rewrites, so allow that.
5082  */
5083  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
5084  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
5085  }
5086  else
5087  {
5088  /* update mapping */
5089  new_ent->cmin = ent->cmin;
5090  new_ent->cmax = ent->cmax;
5091  new_ent->combocid = ent->combocid;
5092  }
5093  }
5094 
5095  if (CloseTransientFile(fd) != 0)
5096  ereport(ERROR,
5098  errmsg("could not close file \"%s\": %m", path)));
5099 }
#define InvalidCommandId
Definition: c.h:656
signed int int32
Definition: c.h:481
#define PG_BINARY
Definition: c.h:1260
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
int errcode_for_file_access(void)
Definition: elog.c:882
int errmsg(const char *fmt,...)
Definition: elog.c:1072
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
int CloseTransientFile(int fd)
Definition: fd.c:2809
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2633
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_ENTER
Definition: hsearch.h:114
#define read(a, b, c)
Definition: win32.h:13
static void ItemPointerCopy(const ItemPointerData *fromPointer, ItemPointerData *toPointer)
Definition: itemptr.h:172
Assert(fmt[strlen(fmt) - 1] !='\n')
#define MAXPGPATH
#define sprintf
Definition: port.h:240
static int fd(const char *x, int i)
Definition: preproc-init.c:105
static HTAB * tuplecid_data
Definition: snapmgr.c:102
ItemPointerData new_tid
Definition: rewriteheap.h:40
RelFileLocator old_locator
Definition: rewriteheap.h:37
ItemPointerData old_tid
Definition: rewriteheap.h:39
RelFileLocator new_locator
Definition: rewriteheap.h:38
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:88
static void pgstat_report_wait_end(void)
Definition: wait_event.h:104

References Assert(), CloseTransientFile(), ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), HASH_ENTER, HASH_FIND, hash_search(), InvalidCommandId, ItemPointerCopy(), sort-test::key, MAXPGPATH, LogicalRewriteMappingData::new_locator, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_locator, LogicalRewriteMappingData::old_tid, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, sprintf, and tuplecid_data.

Referenced by UpdateLogicalMappings().

◆ AssertChangeLsnOrder()

static void AssertChangeLsnOrder ( ReorderBufferTXN txn)
static

Definition at line 974 of file reorderbuffer.c.

975 {
976 #ifdef USE_ASSERT_CHECKING
977  dlist_iter iter;
978  XLogRecPtr prev_lsn = txn->first_lsn;
979 
980  dlist_foreach(iter, &txn->changes)
981  {
982  ReorderBufferChange *cur_change;
983 
984  cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
985 
987  Assert(cur_change->lsn != InvalidXLogRecPtr);
988  Assert(txn->first_lsn <= cur_change->lsn);
989 
990  if (txn->end_lsn != InvalidXLogRecPtr)
991  Assert(cur_change->lsn <= txn->end_lsn);
992 
993  Assert(prev_lsn <= cur_change->lsn);
994 
995  prev_lsn = cur_change->lsn;
996  }
997 #endif
998 }
#define dlist_foreach(iter, lhead)
Definition: ilist.h:623
#define dlist_container(type, membername, ptr)
Definition: ilist.h:593
XLogRecPtr first_lsn
XLogRecPtr end_lsn
dlist_head changes
dlist_node * cur
Definition: ilist.h:179
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert(), ReorderBufferTXN::changes, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, and ReorderBufferChange::lsn.

Referenced by ReorderBufferIterTXNInit().

◆ AssertTXNLsnOrder()

static void AssertTXNLsnOrder ( ReorderBuffer rb)
static

Definition at line 903 of file reorderbuffer.c.

904 {
905 #ifdef USE_ASSERT_CHECKING
907  dlist_iter iter;
908  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
909  XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
910 
911  /*
912  * Skip the verification if we don't reach the LSN at which we start
913  * decoding the contents of transactions yet because until we reach the
914  * LSN, we could have transactions that don't have the association between
915  * the top-level transaction and subtransaction yet and consequently have
916  * the same LSN. We don't guarantee this association until we try to
917  * decode the actual contents of transaction. The ordering of the records
918  * prior to the start_decoding_at LSN should have been checked before the
919  * restart.
920  */
922  return;
923 
924  dlist_foreach(iter, &rb->toplevel_by_lsn)
925  {
927  iter.cur);
928 
929  /* start LSN must be set */
930  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
931 
932  /* If there is an end LSN, it must be higher than start LSN */
933  if (cur_txn->end_lsn != InvalidXLogRecPtr)
934  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
935 
936  /* Current initial LSN must be strictly higher than previous */
937  if (prev_first_lsn != InvalidXLogRecPtr)
938  Assert(prev_first_lsn < cur_txn->first_lsn);
939 
940  /* known-as-subtxn txns must not be listed */
941  Assert(!rbtxn_is_known_subxact(cur_txn));
942 
943  prev_first_lsn = cur_txn->first_lsn;
944  }
945 
947  {
949  base_snapshot_node,
950  iter.cur);
951 
952  /* base snapshot (and its LSN) must be set */
953  Assert(cur_txn->base_snapshot != NULL);
955 
956  /* current LSN must be strictly higher than previous */
957  if (prev_base_snap_lsn != InvalidXLogRecPtr)
958  Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
959 
960  /* known-as-subtxn txns must not be listed */
961  Assert(!rbtxn_is_known_subxact(cur_txn));
962 
963  prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
964  }
965 #endif
966 }
#define rbtxn_is_known_subxact(txn)
bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
Definition: snapbuild.c:433
XLogReaderState * reader
Definition: logical.h:42
struct SnapBuild * snapshot_builder
Definition: logical.h:44
XLogRecPtr base_snapshot_lsn
Snapshot base_snapshot
dlist_head txns_by_base_snapshot_lsn
dlist_head toplevel_by_lsn
void * private_data
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, XLogReaderState::EndRecPtr, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, ReorderBuffer::private_data, rbtxn_is_known_subxact, LogicalDecodingContext::reader, SnapBuildXactNeedsSkip(), LogicalDecodingContext::snapshot_builder, ReorderBuffer::toplevel_by_lsn, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferAssignChild(), ReorderBufferGetOldestTXN(), ReorderBufferGetOldestXmin(), ReorderBufferSetBaseSnapshot(), and ReorderBufferTXNByXid().

◆ file_sort_by_lsn()

static int file_sort_by_lsn ( const ListCell a_p,
const ListCell b_p 
)
static

Definition at line 5116 of file reorderbuffer.c.

5117 {
5120 
5121  return pg_cmp_u64(a->lsn, b->lsn);
5122 }
static int pg_cmp_u64(uint64 a, uint64 b)
Definition: int.h:501
int b
Definition: isn.c:70
int a
Definition: isn.c:69
#define lfirst(lc)
Definition: pg_list.h:172

References a, b, lfirst, and pg_cmp_u64().

Referenced by UpdateLogicalMappings().

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
TimestampTz  abort_time 
)

Definition at line 2887 of file reorderbuffer.c.

2889 {
2890  ReorderBufferTXN *txn;
2891 
2892  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2893  false);
2894 
2895  /* unknown, nothing to remove */
2896  if (txn == NULL)
2897  return;
2898 
2899  txn->xact_time.abort_time = abort_time;
2900 
2901  /* For streamed transactions notify the remote node about the abort. */
2902  if (rbtxn_is_streamed(txn))
2903  {
2904  rb->stream_abort(rb, txn, lsn);
2905 
2906  /*
2907  * We might have decoded changes for this transaction that could load
2908  * the cache as per the current transaction's view (consider DDL's
2909  * happened in this transaction). We don't want the decoding of future
2910  * transactions to use those cache entries so execute invalidations.
2911  */
2912  if (txn->ninvalidations > 0)
2914  txn->invalidations);
2915  }
2916 
2917  /* cosmetic... */
2918  txn->final_lsn = lsn;
2919 
2920  /* remove potential on-disk data, and deallocate */
2921  ReorderBufferCleanupTXN(rb, txn);
2922 }
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_streamed(txn)
SharedInvalidationMessage * invalidations
TimestampTz abort_time
XLogRecPtr final_lsn
union ReorderBufferTXN::@110 xact_time
ReorderBufferStreamAbortCB stream_abort

References ReorderBufferTXN::abort_time, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), ReorderBuffer::stream_abort, and ReorderBufferTXN::xact_time.

Referenced by DecodeAbort().

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)

Definition at line 2932 of file reorderbuffer.c.

2933 {
2934  dlist_mutable_iter it;
2935 
2936  /*
2937  * Iterate through all (potential) toplevel TXNs and abort all that are
2938  * older than what possibly can be running. Once we've found the first
2939  * that is alive we stop, there might be some that acquired an xid earlier
2940  * but started writing later, but it's unlikely and they will be cleaned
2941  * up in a later call to this function.
2942  */
2944  {
2945  ReorderBufferTXN *txn;
2946 
2947  txn = dlist_container(ReorderBufferTXN, node, it.cur);
2948 
2949  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2950  {
2951  elog(DEBUG2, "aborting old transaction %u", txn->xid);
2952 
2953  /* Notify the remote node about the crash/immediate restart. */
2954  if (rbtxn_is_streamed(txn))
2955  rb->stream_abort(rb, txn, InvalidXLogRecPtr);
2956 
2957  /* remove potential on-disk data, and deallocate this tx */
2958  ReorderBufferCleanupTXN(rb, txn);
2959  }
2960  else
2961  return;
2962  }
2963 }
#define DEBUG2
Definition: elog.h:29
#define elog(elevel,...)
Definition: elog.h:224
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:640
TransactionId xid
dlist_node * cur
Definition: ilist.h:200
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280

References dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, elog, InvalidXLogRecPtr, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBuffer::stream_abort, ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), and ReorderBufferTXN::xid.

Referenced by standby_decode().

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 3265 of file reorderbuffer.c.

3268 {
3269  ReorderBufferTXN *txn;
3270  MemoryContext oldcontext;
3271  ReorderBufferChange *change;
3272 
3273  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3274 
3275  oldcontext = MemoryContextSwitchTo(rb->context);
3276 
3277  /*
3278  * Collect all the invalidations under the top transaction, if available,
3279  * so that we can execute them all together. See comments atop this
3280  * function.
3281  */
3282  txn = rbtxn_get_toptxn(txn);
3283 
3284  Assert(nmsgs > 0);
3285 
3286  /* Accumulate invalidations. */
3287  if (txn->ninvalidations == 0)
3288  {
3289  txn->ninvalidations = nmsgs;
3291  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3292  memcpy(txn->invalidations, msgs,
3293  sizeof(SharedInvalidationMessage) * nmsgs);
3294  }
3295  else
3296  {
3299  (txn->ninvalidations + nmsgs));
3300 
3301  memcpy(txn->invalidations + txn->ninvalidations, msgs,
3302  nmsgs * sizeof(SharedInvalidationMessage));
3303  txn->ninvalidations += nmsgs;
3304  }
3305 
3306  change = ReorderBufferGetChange(rb);
3308  change->data.inval.ninvalidations = nmsgs;
3309  change->data.inval.invalidations = (SharedInvalidationMessage *)
3310  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3311  memcpy(change->data.inval.invalidations, msgs,
3312  sizeof(SharedInvalidationMessage) * nmsgs);
3313 
3314  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3315 
3316  MemoryContextSwitchTo(oldcontext);
3317 }
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1528
void * palloc(Size size)
Definition: mcxt.c:1304
MemoryContextSwitchTo(old_ctx)
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
#define rbtxn_get_toptxn(txn)
@ REORDER_BUFFER_CHANGE_INVALIDATION
Definition: reorderbuffer.h:50
ReorderBufferChangeType action
Definition: reorderbuffer.h:75
struct ReorderBufferChange::@104::@109 inval
union ReorderBufferChange::@104 data
MemoryContext context

References ReorderBufferChange::action, Assert(), ReorderBuffer::context, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferTXN::invalidations, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, palloc(), rbtxn_get_toptxn, REORDER_BUFFER_CHANGE_INVALIDATION, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), and repalloc().

Referenced by xact_decode().

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileLocator  locator,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 3228 of file reorderbuffer.c.

3232 {
3234  ReorderBufferTXN *txn;
3235 
3236  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3237 
3238  change->data.tuplecid.locator = locator;
3239  change->data.tuplecid.tid = tid;
3240  change->data.tuplecid.cmin = cmin;
3241  change->data.tuplecid.cmax = cmax;
3242  change->data.tuplecid.combocid = combocid;
3243  change->lsn = lsn;
3244  change->txn = txn;
3246 
3247  dlist_push_tail(&txn->tuplecids, &change->node);
3248  txn->ntuplecids++;
3249 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:364
@ REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
Definition: reorderbuffer.h:53
struct ReorderBufferChange::@104::@108 tuplecid
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:78
dlist_head tuplecids

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, and ReorderBufferChange::txn.

Referenced by SnapBuildProcessNewCid().

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

◆ ReorderBufferAllocate()

ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 303 of file reorderbuffer.c.

304 {
305  ReorderBuffer *buffer;
306  HASHCTL hash_ctl;
307  MemoryContext new_ctx;
308 
309  Assert(MyReplicationSlot != NULL);
310 
311  /* allocate memory in own context, to have better accountability */
313  "ReorderBuffer",
315 
316  buffer =
317  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
318 
319  memset(&hash_ctl, 0, sizeof(hash_ctl));
320 
321  buffer->context = new_ctx;
322 
323  buffer->change_context = SlabContextCreate(new_ctx,
324  "Change",
326  sizeof(ReorderBufferChange));
327 
328  buffer->txn_context = SlabContextCreate(new_ctx,
329  "TXN",
331  sizeof(ReorderBufferTXN));
332 
333  /*
334  * XXX the allocation sizes used below pre-date generation context's block
335  * growing code. These values should likely be benchmarked and set to
336  * more suitable values.
337  */
338  buffer->tup_context = GenerationContextCreate(new_ctx,
339  "Tuples",
343 
344  hash_ctl.keysize = sizeof(TransactionId);
345  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
346  hash_ctl.hcxt = buffer->context;
347 
348  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
350 
352  buffer->by_txn_last_txn = NULL;
353 
354  buffer->outbuf = NULL;
355  buffer->outbufsize = 0;
356  buffer->size = 0;
357 
358  buffer->spillTxns = 0;
359  buffer->spillCount = 0;
360  buffer->spillBytes = 0;
361  buffer->streamTxns = 0;
362  buffer->streamCount = 0;
363  buffer->streamBytes = 0;
364  buffer->totalTxns = 0;
365  buffer->totalBytes = 0;
366 
368 
369  dlist_init(&buffer->toplevel_by_lsn);
371  dclist_init(&buffer->catchange_txns);
372 
373  /*
374  * Ensure there's no stale data from prior uses of this slot, in case some
375  * prior exit avoided calling ReorderBufferFree. Failure to do this can
376  * produce duplicated txns, and it's very cheap if there's nothing there.
377  */
379 
380  return buffer;
381 }
#define NameStr(name)
Definition: c.h:733
uint32 TransactionId
Definition: c.h:639
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: generation.c:160
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
static void dclist_init(dclist_head *head)
Definition: ilist.h:671
MemoryContext CurrentMemoryContext
Definition: mcxt.c:131
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1168
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:182
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:183
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:322
ReplicationSlot * MyReplicationSlot
Definition: slot.c:138
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
MemoryContext hcxt
Definition: hsearch.h:86
dclist_head catchange_txns
MemoryContext change_context
ReorderBufferTXN * by_txn_last_txn
TransactionId by_txn_last_xid
MemoryContext tup_context
MemoryContext txn_context
XLogRecPtr current_restart_decoding_lsn
ReplicationSlotPersistentData data
Definition: slot.h:178
#define InvalidTransactionId
Definition: transam.h:31

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert(), ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::catchange_txns, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dclist_init(), dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, ReorderBufferCleanupSerializedTXNs(), ReorderBuffer::size, SLAB_DEFAULT_BLOCK_SIZE, SLAB_LARGE_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBuffer::toplevel_by_lsn, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBuffer::tup_context, ReorderBuffer::txn_context, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

◆ ReorderBufferApplyChange()

static void ReorderBufferApplyChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1953 of file reorderbuffer.c.

1956 {
1957  if (streaming)
1958  rb->stream_change(rb, txn, relation, change);
1959  else
1960  rb->apply_change(rb, txn, relation, change);
1961 }
ReorderBufferStreamChangeCB stream_change
ReorderBufferApplyChangeCB apply_change

References ReorderBuffer::apply_change, and ReorderBuffer::stream_change.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferApplyMessage()

static void ReorderBufferApplyMessage ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1981 of file reorderbuffer.c.

1983 {
1984  if (streaming)
1985  rb->stream_message(rb, txn, change->lsn, true,
1986  change->data.msg.prefix,
1987  change->data.msg.message_size,
1988  change->data.msg.message);
1989  else
1990  rb->message(rb, txn, change->lsn, true,
1991  change->data.msg.prefix,
1992  change->data.msg.message_size,
1993  change->data.msg.message);
1994 }
struct ReorderBufferChange::@104::@107 msg
ReorderBufferStreamMessageCB stream_message
ReorderBufferMessageCB message

References ReorderBufferChange::data, ReorderBufferChange::lsn, ReorderBuffer::message, ReorderBufferChange::msg, and ReorderBuffer::stream_message.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferApplyTruncate()

static void ReorderBufferApplyTruncate ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  nrelations,
Relation relations,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1967 of file reorderbuffer.c.

1970 {
1971  if (streaming)
1972  rb->stream_truncate(rb, txn, nrelations, relations, change);
1973  else
1974  rb->apply_truncate(rb, txn, nrelations, relations, change);
1975 }
ReorderBufferStreamTruncateCB stream_truncate
ReorderBufferApplyTruncateCB apply_truncate

References ReorderBuffer::apply_truncate, and ReorderBuffer::stream_truncate.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 1060 of file reorderbuffer.c.

1062 {
1063  ReorderBufferTXN *txn;
1064  ReorderBufferTXN *subtxn;
1065  bool new_top;
1066  bool new_sub;
1067 
1068  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1069  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1070 
1071  if (!new_sub)
1072  {
1073  if (rbtxn_is_known_subxact(subtxn))
1074  {
1075  /* already associated, nothing to do */
1076  return;
1077  }
1078  else
1079  {
1080  /*
1081  * We already saw this transaction, but initially added it to the
1082  * list of top-level txns. Now that we know it's not top-level,
1083  * remove it from there.
1084  */
1085  dlist_delete(&subtxn->node);
1086  }
1087  }
1088 
1089  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1090  subtxn->toplevel_xid = xid;
1091  Assert(subtxn->nsubtxns == 0);
1092 
1093  /* set the reference to top-level transaction */
1094  subtxn->toptxn = txn;
1095 
1096  /* add to subtransaction list */
1097  dlist_push_tail(&txn->subtxns, &subtxn->node);
1098  txn->nsubtxns++;
1099 
1100  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1101  ReorderBufferTransferSnapToParent(txn, subtxn);
1102 
1103  /* Verify LSN-ordering invariant */
1104  AssertTXNLsnOrder(rb);
1105 }
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
#define RBTXN_IS_SUBXACT
TransactionId toplevel_xid
struct ReorderBufferTXN * toptxn
dlist_head subtxns

References Assert(), AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SUBXACT, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, ReorderBufferTXN::toplevel_xid, ReorderBufferTXN::toptxn, and ReorderBufferTXN::txn_flags.

Referenced by LogicalDecodingProcessRecord(), and ReorderBufferCommitChild().

◆ ReorderBufferBuildTupleCidHash()

static void ReorderBufferBuildTupleCidHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1719 of file reorderbuffer.c.

1720 {
1721  dlist_iter iter;
1722  HASHCTL hash_ctl;
1723 
1725  return;
1726 
1727  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1728  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1729  hash_ctl.hcxt = rb->context;
1730 
1731  /*
1732  * create the hash with the exact number of to-be-stored tuplecids from
1733  * the start
1734  */
1735  txn->tuplecid_hash =
1736  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1738 
1739  dlist_foreach(iter, &txn->tuplecids)
1740  {
1743  bool found;
1744  ReorderBufferChange *change;
1745 
1746  change = dlist_container(ReorderBufferChange, node, iter.cur);
1747 
1749 
1750  /* be careful about padding */
1751  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1752 
1753  key.rlocator = change->data.tuplecid.locator;
1754 
1755  ItemPointerCopy(&change->data.tuplecid.tid,
1756  &key.tid);
1757 
1758  ent = (ReorderBufferTupleCidEnt *)
1759  hash_search(txn->tuplecid_hash, &key, HASH_ENTER, &found);
1760  if (!found)
1761  {
1762  ent->cmin = change->data.tuplecid.cmin;
1763  ent->cmax = change->data.tuplecid.cmax;
1764  ent->combocid = change->data.tuplecid.combocid;
1765  }
1766  else
1767  {
1768  /*
1769  * Maybe we already saw this tuple before in this transaction, but
1770  * if so it must have the same cmin.
1771  */
1772  Assert(ent->cmin == change->data.tuplecid.cmin);
1773 
1774  /*
1775  * cmax may be initially invalid, but once set it can only grow,
1776  * and never become invalid again.
1777  */
1778  Assert((ent->cmax == InvalidCommandId) ||
1779  ((change->data.tuplecid.cmax != InvalidCommandId) &&
1780  (change->data.tuplecid.cmax > ent->cmax)));
1781  ent->cmax = change->data.tuplecid.cmax;
1782  }
1783  }
1784 }
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
#define rbtxn_has_catalog_changes(txn)

References ReorderBufferChange::action, Assert(), ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, dlist_container, dlist_foreach, dlist_is_empty(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), HASHCTL::hcxt, InvalidCommandId, ItemPointerCopy(), sort-test::key, HASHCTL::keysize, ReorderBufferTXN::ntuplecids, rbtxn_has_catalog_changes, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::tuplecids.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferCanStartStreaming()

static bool ReorderBufferCanStartStreaming ( ReorderBuffer rb)
inlinestatic

Definition at line 3962 of file reorderbuffer.c.

3963 {
3965  SnapBuild *builder = ctx->snapshot_builder;
3966 
3967  /* We can't start streaming unless a consistent state is reached. */
3969  return false;
3970 
3971  /*
3972  * We can't start streaming immediately even if the streaming is enabled
3973  * because we previously decoded this transaction and now just are
3974  * restarting.
3975  */
3976  if (ReorderBufferCanStream(rb) &&
3977  !SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr))
3978  return true;
3979 
3980  return false;
3981 }
static bool ReorderBufferCanStream(ReorderBuffer *rb)
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
Definition: snapbuild.c:406
@ SNAPBUILD_CONSISTENT
Definition: snapbuild.h:46
XLogRecPtr ReadRecPtr
Definition: xlogreader.h:206

References ReorderBuffer::private_data, LogicalDecodingContext::reader, XLogReaderState::ReadRecPtr, ReorderBufferCanStream(), SNAPBUILD_CONSISTENT, SnapBuildCurrentState(), SnapBuildXactNeedsSkip(), and LogicalDecodingContext::snapshot_builder.

Referenced by ReorderBufferCheckMemoryLimit(), and ReorderBufferProcessPartialChange().

◆ ReorderBufferCanStream()

static bool ReorderBufferCanStream ( ReorderBuffer rb)
inlinestatic

Definition at line 3953 of file reorderbuffer.c.

3954 {
3956 
3957  return ctx->streaming;
3958 }

References ReorderBuffer::private_data, and LogicalDecodingContext::streaming.

Referenced by ReorderBufferCanStartStreaming(), and ReorderBufferProcessPartialChange().

◆ ReorderBufferChangeMemoryUpdate()

static void ReorderBufferChangeMemoryUpdate ( ReorderBuffer rb,
ReorderBufferChange change,
bool  addition,
Size  sz 
)
static

Definition at line 3174 of file reorderbuffer.c.

3177 {
3178  ReorderBufferTXN *txn;
3179  ReorderBufferTXN *toptxn;
3180 
3181  Assert(change->txn);
3182 
3183  /*
3184  * Ignore tuple CID changes, because those are not evicted when reaching
3185  * memory limit. So we just don't count them, because it might easily
3186  * trigger a pointless attempt to spill.
3187  */
3189  return;
3190 
3191  txn = change->txn;
3192 
3193  /*
3194  * Update the total size in top level as well. This is later used to
3195  * compute the decoding stats.
3196  */
3197  toptxn = rbtxn_get_toptxn(txn);
3198 
3199  if (addition)
3200  {
3201  txn->size += sz;
3202  rb->size += sz;
3203 
3204  /* Update the total size in the top transaction. */
3205  toptxn->total_size += sz;
3206  }
3207  else
3208  {
3209  Assert((rb->size >= sz) && (txn->size >= sz));
3210  txn->size -= sz;
3211  rb->size -= sz;
3212 
3213  /* Update the total size in the top transaction. */
3214  toptxn->total_size -= sz;
3215  }
3216 
3217  Assert(txn->size <= rb->size);
3218 }

References ReorderBufferChange::action, Assert(), rbtxn_get_toptxn, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferTXN::size, ReorderBuffer::size, ReorderBufferTXN::total_size, and ReorderBufferChange::txn.

Referenced by ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferReturnChange(), and ReorderBufferToastReplace().

◆ ReorderBufferChangeSize()

static Size ReorderBufferChangeSize ( ReorderBufferChange change)
static

Definition at line 4105 of file reorderbuffer.c.

4106 {
4107  Size sz = sizeof(ReorderBufferChange);
4108 
4109  switch (change->action)
4110  {
4111  /* fall through these, they're all similar enough */
4116  {
4117  HeapTuple oldtup,
4118  newtup;
4119  Size oldlen = 0;
4120  Size newlen = 0;
4121 
4122  oldtup = change->data.tp.oldtuple;
4123  newtup = change->data.tp.newtuple;
4124 
4125  if (oldtup)
4126  {
4127  sz += sizeof(HeapTupleData);
4128  oldlen = oldtup->t_len;
4129  sz += oldlen;
4130  }
4131 
4132  if (newtup)
4133  {
4134  sz += sizeof(HeapTupleData);
4135  newlen = newtup->t_len;
4136  sz += newlen;
4137  }
4138 
4139  break;
4140  }
4142  {
4143  Size prefix_size = strlen(change->data.msg.prefix) + 1;
4144 
4145  sz += prefix_size + change->data.msg.message_size +
4146  sizeof(Size) + sizeof(Size);
4147 
4148  break;
4149  }
4151  {
4152  sz += sizeof(SharedInvalidationMessage) *
4153  change->data.inval.ninvalidations;
4154  break;
4155  }
4157  {
4158  Snapshot snap;
4159 
4160  snap = change->data.snapshot;
4161 
4162  sz += sizeof(SnapshotData) +
4163  sizeof(TransactionId) * snap->xcnt +
4164  sizeof(TransactionId) * snap->subxcnt;
4165 
4166  break;
4167  }
4169  {
4170  sz += sizeof(Oid) * change->data.truncate.nrelids;
4171 
4172  break;
4173  }
4178  /* ReorderBufferChange contains everything important */
4179  break;
4180  }
4181 
4182  return sz;
4183 }
size_t Size
Definition: c.h:592
struct HeapTupleData HeapTupleData
unsigned int Oid
Definition: postgres_ext.h:31
struct ReorderBufferChange ReorderBufferChange
@ REORDER_BUFFER_CHANGE_MESSAGE
Definition: reorderbuffer.h:49
@ REORDER_BUFFER_CHANGE_TRUNCATE
Definition: reorderbuffer.h:57
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:48
struct SnapshotData SnapshotData
uint32 t_len
Definition: htup.h:64
struct ReorderBufferChange::@104::@105 tp
struct ReorderBufferChange::@104::@106 truncate
int32 subxcnt
Definition: snapshot.h:181
uint32 xcnt
Definition: snapshot.h:169

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::msg, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::snapshot, SnapshotData::subxcnt, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, and SnapshotData::xcnt.

Referenced by ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferReturnChange(), and ReorderBufferToastReplace().

◆ ReorderBufferCheckMemoryLimit()

static void ReorderBufferCheckMemoryLimit ( ReorderBuffer rb)
static

Definition at line 3574 of file reorderbuffer.c.

3575 {
3576  ReorderBufferTXN *txn;
3577 
3578  /*
3579  * Bail out if debug_logical_replication_streaming is buffered and we
3580  * haven't exceeded the memory limit.
3581  */
3583  rb->size < logical_decoding_work_mem * 1024L)
3584  return;
3585 
3586  /*
3587  * If debug_logical_replication_streaming is immediate, loop until there's
3588  * no change. Otherwise, loop until we reach under the memory limit. One
3589  * might think that just by evicting the largest (sub)transaction we will
3590  * come under the memory limit based on assumption that the selected
3591  * transaction is at least as large as the most recent change (which
3592  * caused us to go over the memory limit). However, that is not true
3593  * because a user can reduce the logical_decoding_work_mem to a smaller
3594  * value before the most recent change.
3595  */
3596  while (rb->size >= logical_decoding_work_mem * 1024L ||
3598  rb->size > 0))
3599  {
3600  /*
3601  * Pick the largest transaction and evict it from memory by streaming,
3602  * if possible. Otherwise, spill to disk.
3603  */
3605  (txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
3606  {
3607  /* we know there has to be one, because the size is not zero */
3608  Assert(txn && rbtxn_is_toptxn(txn));
3609  Assert(txn->total_size > 0);
3610  Assert(rb->size >= txn->total_size);
3611 
3612  ReorderBufferStreamTXN(rb, txn);
3613  }
3614  else
3615  {
3616  /*
3617  * Pick the largest transaction (or subtransaction) and evict it
3618  * from memory by serializing it to disk.
3619  */
3620  txn = ReorderBufferLargestTXN(rb);
3621 
3622  /* we know there has to be one, because the size is not zero */
3623  Assert(txn);
3624  Assert(txn->size > 0);
3625  Assert(rb->size >= txn->size);
3626 
3627  ReorderBufferSerializeTXN(rb, txn);
3628  }
3629 
3630  /*
3631  * After eviction, the transaction should have no entries in memory,
3632  * and should use 0 bytes for changes.
3633  */
3634  Assert(txn->size == 0);
3635  Assert(txn->nentries_mem == 0);
3636  }
3637 
3638  /* We must be under the memory limit now. */
3639  Assert(rb->size < logical_decoding_work_mem * 1024L);
3640 }
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)
static ReorderBufferTXN * ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
int logical_decoding_work_mem
int debug_logical_replication_streaming
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
Definition: reorderbuffer.h:28
@ DEBUG_LOGICAL_REP_STREAMING_BUFFERED
Definition: reorderbuffer.h:27
#define rbtxn_is_toptxn(txn)

References Assert(), DEBUG_LOGICAL_REP_STREAMING_BUFFERED, DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE, debug_logical_replication_streaming, logical_decoding_work_mem, ReorderBufferTXN::nentries_mem, rbtxn_is_toptxn, ReorderBufferCanStartStreaming(), ReorderBufferLargestStreamableTopTXN(), ReorderBufferLargestTXN(), ReorderBufferSerializeTXN(), ReorderBufferStreamTXN(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBufferTXN::total_size.

Referenced by ReorderBufferQueueChange().

◆ ReorderBufferCleanupSerializedTXNs()

static void ReorderBufferCleanupSerializedTXNs ( const char *  slotname)
static

Definition at line 4531 of file reorderbuffer.c.

4532 {
4533  DIR *spill_dir;
4534  struct dirent *spill_de;
4535  struct stat statbuf;
4536  char path[MAXPGPATH * 2 + 12];
4537 
4538  sprintf(path, "pg_replslot/%s", slotname);
4539 
4540  /* we're only handling directories here, skip if it's not ours */
4541  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
4542  return;
4543 
4544  spill_dir = AllocateDir(path);
4545  while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
4546  {
4547  /* only look at names that can be ours */
4548  if (strncmp(spill_de->d_name, "xid", 3) == 0)
4549  {
4550  snprintf(path, sizeof(path),
4551  "pg_replslot/%s/%s", slotname,
4552  spill_de->d_name);
4553 
4554  if (unlink(path) != 0)
4555  ereport(ERROR,
4557  errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
4558  path, slotname)));
4559  }
4560  }
4561  FreeDir(spill_dir);
4562 }
#define INFO
Definition: elog.h:34
int FreeDir(DIR *dir)
Definition: fd.c:2961
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2924
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2843
#define snprintf
Definition: port.h:238
Definition: dirent.c:26
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15
#define lstat(path, sb)
Definition: win32_port.h:285
#define S_ISDIR(m)
Definition: win32_port.h:325

References AllocateDir(), dirent::d_name, ereport, errcode_for_file_access(), errmsg(), ERROR, FreeDir(), INFO, lstat, MAXPGPATH, ReadDirExtended(), S_ISDIR, snprintf, sprintf, and stat::st_mode.

Referenced by ReorderBufferAllocate(), ReorderBufferFree(), and StartupReorderBuffer().

◆ ReorderBufferCleanupTXN()

static void ReorderBufferCleanupTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1496 of file reorderbuffer.c.

1497 {
1498  bool found;
1499  dlist_mutable_iter iter;
1500 
1501  /* cleanup subtransactions & their changes */
1502  dlist_foreach_modify(iter, &txn->subtxns)
1503  {
1504  ReorderBufferTXN *subtxn;
1505 
1506  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1507 
1508  /*
1509  * Subtransactions are always associated to the toplevel TXN, even if
1510  * they originally were happening inside another subtxn, so we won't
1511  * ever recurse more than one level deep here.
1512  */
1513  Assert(rbtxn_is_known_subxact(subtxn));
1514  Assert(subtxn->nsubtxns == 0);
1515 
1516  ReorderBufferCleanupTXN(rb, subtxn);
1517  }
1518 
1519  /* cleanup changes in the txn */
1520  dlist_foreach_modify(iter, &txn->changes)
1521  {
1522  ReorderBufferChange *change;
1523 
1524  change = dlist_container(ReorderBufferChange, node, iter.cur);
1525 
1526  /* Check we're not mixing changes from different transactions. */
1527  Assert(change->txn == txn);
1528 
1529  ReorderBufferReturnChange(rb, change, true);
1530  }
1531 
1532  /*
1533  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1534  * They are always stored in the toplevel transaction.
1535  */
1536  dlist_foreach_modify(iter, &txn->tuplecids)
1537  {
1538  ReorderBufferChange *change;
1539 
1540  change = dlist_container(ReorderBufferChange, node, iter.cur);
1541 
1542  /* Check we're not mixing changes from different transactions. */
1543  Assert(change->txn == txn);
1545 
1546  ReorderBufferReturnChange(rb, change, true);
1547  }
1548 
1549  /*
1550  * Cleanup the base snapshot, if set.
1551  */
1552  if (txn->base_snapshot != NULL)
1553  {
1556  }
1557 
1558  /*
1559  * Cleanup the snapshot for the last streamed run.
1560  */
1561  if (txn->snapshot_now != NULL)
1562  {
1563  Assert(rbtxn_is_streamed(txn));
1565  }
1566 
1567  /*
1568  * Remove TXN from its containing lists.
1569  *
1570  * Note: if txn is known as subxact, we are deleting the TXN from its
1571  * parent's list of known subxacts; this leaves the parent's nsubxacts
1572  * count too high, but we don't care. Otherwise, we are deleting the TXN
1573  * from the LSN-ordered list of toplevel TXNs. We remove the TXN from the
1574  * list of catalog modifying transactions as well.
1575  */
1576  dlist_delete(&txn->node);
1577  if (rbtxn_has_catalog_changes(txn))
1579 
1580  /* now remove reference from buffer */
1581  hash_search(rb->by_txn, &txn->xid, HASH_REMOVE, &found);
1582  Assert(found);
1583 
1584  /* remove entries spilled to disk */
1585  if (rbtxn_is_serialized(txn))
1586  ReorderBufferRestoreCleanup(rb, txn);
1587 
1588  /* deallocate */
1589  ReorderBufferReturnTXN(rb, txn);
1590 }
@ HASH_REMOVE
Definition: hsearch.h:115
static void dclist_delete_from(dclist_head *head, dlist_node *node)
Definition: ilist.h:763
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
#define rbtxn_is_serialized(txn)
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:457
Snapshot snapshot_now
dlist_node catchange_node
dlist_node base_snapshot_node

References ReorderBufferChange::action, Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_node, ReorderBuffer::by_txn, ReorderBufferTXN::catchange_node, ReorderBuffer::catchange_txns, ReorderBufferTXN::changes, dlist_mutable_iter::cur, dclist_delete_from(), dlist_container, dlist_delete(), dlist_foreach_modify, HASH_REMOVE, hash_search(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_has_catalog_changes, rbtxn_is_known_subxact, rbtxn_is_serialized, rbtxn_is_streamed, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferFreeSnap(), ReorderBufferRestoreCleanup(), ReorderBufferReturnChange(), ReorderBufferReturnTXN(), SnapBuildSnapDecRefcount(), ReorderBufferTXN::snapshot_now, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAbortOld(), ReorderBufferFinishPrepared(), ReorderBufferForget(), ReorderBufferProcessTXN(), ReorderBufferReplay(), and ReorderBufferStreamCommit().

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2696 of file reorderbuffer.c.

2700 {
2701  ReorderBufferTXN *txn;
2702 
2703  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2704  false);
2705 
2706  /* unknown transaction, nothing to replay */
2707  if (txn == NULL)
2708  return;
2709 
2710  ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2711  origin_id, origin_lsn);
2712 }
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)

References InvalidXLogRecPtr, ReorderBufferReplay(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 1180 of file reorderbuffer.c.

1183 {
1184  ReorderBufferTXN *subtxn;
1185 
1186  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1187  InvalidXLogRecPtr, false);
1188 
1189  /*
1190  * No need to do anything if that subtxn didn't contain any changes
1191  */
1192  if (!subtxn)
1193  return;
1194 
1195  subtxn->final_lsn = commit_lsn;
1196  subtxn->end_lsn = end_lsn;
1197 
1198  /*
1199  * Assign this subxact as a child of the toplevel xact (no-op if already
1200  * done.)
1201  */
1202  ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1203 }
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit(), and DecodePrepare().

◆ ReorderBufferCopySnap()

static Snapshot ReorderBufferCopySnap ( ReorderBuffer rb,
Snapshot  orig_snap,
ReorderBufferTXN txn,
CommandId  cid 
)
static

Definition at line 1792 of file reorderbuffer.c.

1794 {
1795  Snapshot snap;
1796  dlist_iter iter;
1797  int i = 0;
1798  Size size;
1799 
1800  size = sizeof(SnapshotData) +
1801  sizeof(TransactionId) * orig_snap->xcnt +
1802  sizeof(TransactionId) * (txn->nsubtxns + 1);
1803 
1804  snap = MemoryContextAllocZero(rb->context, size);
1805  memcpy(snap, orig_snap, sizeof(SnapshotData));
1806 
1807  snap->copied = true;
1808  snap->active_count = 1; /* mark as active so nobody frees it */
1809  snap->regd_count = 0;
1810  snap->xip = (TransactionId *) (snap + 1);
1811 
1812  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1813 
1814  /*
1815  * snap->subxip contains all txids that belong to our transaction which we
1816  * need to check via cmin/cmax. That's why we store the toplevel
1817  * transaction in there as well.
1818  */
1819  snap->subxip = snap->xip + snap->xcnt;
1820  snap->subxip[i++] = txn->xid;
1821 
1822  /*
1823  * subxcnt isn't decreased when subtransactions abort, so count manually.
1824  * Since it's an upper boundary it is safe to use it for the allocation
1825  * above.
1826  */
1827  snap->subxcnt = 1;
1828 
1829  dlist_foreach(iter, &txn->subtxns)
1830  {
1831  ReorderBufferTXN *sub_txn;
1832 
1833  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1834  snap->subxip[i++] = sub_txn->xid;
1835  snap->subxcnt++;
1836  }
1837 
1838  /* sort so we can bsearch() later */
1839  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1840 
1841  /* store the specified current CommandId */
1842  snap->curcid = cid;
1843 
1844  return snap;
1845 }
int i
Definition: isn.c:73
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1202
#define qsort(a, b, c, d)
Definition: port.h:449
static pg_noinline void Size size
Definition: slab.c:607
bool copied
Definition: snapshot.h:185
uint32 regd_count
Definition: snapshot.h:205
uint32 active_count
Definition: snapshot.h:204
CommandId curcid
Definition: snapshot.h:187
TransactionId * subxip
Definition: snapshot.h:180
TransactionId * xip
Definition: snapshot.h:168
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:139

References SnapshotData::active_count, ReorderBuffer::context, SnapshotData::copied, dlist_iter::cur, SnapshotData::curcid, dlist_container, dlist_foreach, i, MemoryContextAllocZero(), ReorderBufferTXN::nsubtxns, qsort, SnapshotData::regd_count, size, ReorderBufferTXN::subtxns, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::xcnt, ReorderBufferTXN::xid, xidComparator(), and SnapshotData::xip.

Referenced by ReorderBufferProcessTXN(), ReorderBufferSaveTXNSnapshot(), and ReorderBufferStreamTXN().

◆ ReorderBufferExecuteInvalidations()

static void ReorderBufferExecuteInvalidations ( uint32  nmsgs,
SharedInvalidationMessage msgs 
)
static

Definition at line 3324 of file reorderbuffer.c.

3325 {
3326  int i;
3327 
3328  for (i = 0; i < nmsgs; i++)
3330 }
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:705

References i, and LocalExecuteInvalidationMessage().

Referenced by ReorderBufferFinishPrepared(), and ReorderBufferProcessTXN().

◆ ReorderBufferFinishPrepared()

void ReorderBufferFinishPrepared ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
XLogRecPtr  two_phase_at,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn,
char *  gid,
bool  is_commit 
)

Definition at line 2802 of file reorderbuffer.c.

2807 {
2808  ReorderBufferTXN *txn;
2809  XLogRecPtr prepare_end_lsn;
2810  TimestampTz prepare_time;
2811 
2812  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
2813 
2814  /* unknown transaction, nothing to do */
2815  if (txn == NULL)
2816  return;
2817 
2818  /*
2819  * By this time the txn has the prepare record information, remember it to
2820  * be later used for rollback.
2821  */
2822  prepare_end_lsn = txn->end_lsn;
2823  prepare_time = txn->xact_time.prepare_time;
2824 
2825  /* add the gid in the txn */
2826  txn->gid = pstrdup(gid);
2827 
2828  /*
2829  * It is possible that this transaction is not decoded at prepare time
2830  * either because by that time we didn't have a consistent snapshot, or
2831  * two_phase was not enabled, or it was decoded earlier but we have
2832  * restarted. We only need to send the prepare if it was not decoded
2833  * earlier. We don't need to decode the xact for aborts if it is not done
2834  * already.
2835  */
2836  if ((txn->final_lsn < two_phase_at) && is_commit)
2837  {
2838  txn->txn_flags |= RBTXN_PREPARE;
2839 
2840  /*
2841  * The prepare info must have been updated in txn even if we skip
2842  * prepare.
2843  */
2845 
2846  /*
2847  * By this time the txn has the prepare record information and it is
2848  * important to use that so that downstream gets the accurate
2849  * information. If instead, we have passed commit information here
2850  * then downstream can behave as it has already replayed commit
2851  * prepared after the restart.
2852  */
2853  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2854  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2855  }
2856 
2857  txn->final_lsn = commit_lsn;
2858  txn->end_lsn = end_lsn;
2859  txn->xact_time.commit_time = commit_time;
2860  txn->origin_id = origin_id;
2861  txn->origin_lsn = origin_lsn;
2862 
2863  if (is_commit)
2864  rb->commit_prepared(rb, txn, commit_lsn);
2865  else
2866  rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
2867 
2868  /* cleanup: make sure there's no cache pollution */
2870  txn->invalidations);
2871  ReorderBufferCleanupTXN(rb, txn);
2872 }
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1683
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
#define RBTXN_PREPARE
TimestampTz commit_time
RepOriginId origin_id
XLogRecPtr origin_lsn
TimestampTz prepare_time
ReorderBufferCommitPreparedCB commit_prepared
ReorderBufferRollbackPreparedCB rollback_prepared

References Assert(), ReorderBuffer::commit_prepared, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, pstrdup(), RBTXN_PREPARE, ReorderBufferCleanupTXN(), ReorderBufferExecuteInvalidations(), ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBuffer::rollback_prepared, ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodeAbort(), and DecodeCommit().

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2979 of file reorderbuffer.c.

2980 {
2981  ReorderBufferTXN *txn;
2982 
2983  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2984  false);
2985 
2986  /* unknown, nothing to forget */
2987  if (txn == NULL)
2988  return;
2989 
2990  /* this transaction mustn't be streamed */
2991  Assert(!rbtxn_is_streamed(txn));
2992 
2993  /* cosmetic... */
2994  txn->final_lsn = lsn;
2995 
2996  /*
2997  * Process cache invalidation messages if there are any. Even if we're not
2998  * interested in the transaction's contents, it could have manipulated the
2999  * catalog and we need to update the caches according to that.
3000  */
3001  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3003  txn->invalidations);
3004  else
3005  Assert(txn->ninvalidations == 0);
3006 
3007  /* remove potential on-disk data, and deallocate */
3008  ReorderBufferCleanupTXN(rb, txn);
3009 }

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer rb)

Definition at line 387 of file reorderbuffer.c.

388 {
390 
391  /*
392  * We free separately allocated data by entirely scrapping reorderbuffer's
393  * memory context.
394  */
396 
397  /* Free disk space used by unconsumed reorder buffers */
399 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:442
tree context
Definition: radixtree.h:1797

References context, ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

◆ ReorderBufferFreeSnap()

static void ReorderBufferFreeSnap ( ReorderBuffer rb,
Snapshot  snap 
)
static

Definition at line 1851 of file reorderbuffer.c.

1852 {
1853  if (snap->copied)
1854  pfree(snap);
1855  else
1857 }
void pfree(void *pointer)
Definition: mcxt.c:1508

References SnapshotData::copied, pfree(), and SnapBuildSnapDecRefcount().

Referenced by ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), ReorderBufferReturnChange(), and ReorderBufferStreamTXN().

◆ ReorderBufferGetCatalogChangesXacts()

TransactionId* ReorderBufferGetCatalogChangesXacts ( ReorderBuffer rb)

Definition at line 3374 of file reorderbuffer.c.

3375 {
3376  dlist_iter iter;
3377  TransactionId *xids = NULL;
3378  size_t xcnt = 0;
3379 
3380  /* Quick return if the list is empty */
3381  if (dclist_count(&rb->catchange_txns) == 0)
3382  return NULL;
3383 
3384  /* Initialize XID array */
3385  xids = (TransactionId *) palloc(sizeof(TransactionId) *
3387  dclist_foreach(iter, &rb->catchange_txns)
3388  {
3390  catchange_node,
3391  iter.cur);
3392 
3394 
3395  xids[xcnt++] = txn->xid;
3396  }
3397 
3398  qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
3399 
3400  Assert(xcnt == dclist_count(&rb->catchange_txns));
3401  return xids;
3402 }
#define dclist_container(type, membername, ptr)
Definition: ilist.h:947
static uint32 dclist_count(const dclist_head *head)
Definition: ilist.h:932
#define dclist_foreach(iter, lhead)
Definition: ilist.h:970

References Assert(), ReorderBuffer::catchange_txns, dlist_iter::cur, dclist_container, dclist_count(), dclist_foreach, palloc(), qsort, rbtxn_has_catalog_changes, ReorderBufferTXN::xid, and xidComparator().

Referenced by SnapBuildSerialize().

◆ ReorderBufferGetChange()

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer rb)

Definition at line 1005 of file reorderbuffer.c.

1006 {
1007  ReorderBufferTXN *txn;
1008 
1009  AssertTXNLsnOrder(rb);
1010 
1011  if (dlist_is_empty(&rb->toplevel_by_lsn))
1012  return NULL;
1013 
1015 
1018  return txn;
1019 }
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:603

References Assert(), AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, and ReorderBuffer::toplevel_by_lsn.

Referenced by SnapBuildProcessRunningXacts().

◆ ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBuffer rb)

Definition at line 1033 of file reorderbuffer.c.

1034 {
1035  ReorderBufferTXN *txn;
1036 
1037  AssertTXNLsnOrder(rb);
1038 
1040  return InvalidTransactionId;
1041 
1042  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
1044  return txn->base_snapshot->xmin;
1045 }
TransactionId xmin
Definition: snapshot.h:157

References AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, dlist_head_element, dlist_is_empty(), InvalidTransactionId, ReorderBuffer::txns_by_base_snapshot_lsn, and SnapshotData::xmin.

Referenced by SnapBuildProcessRunningXacts().

◆ ReorderBufferGetRelids()

Oid* ReorderBufferGetRelids ( ReorderBuffer rb,
int  nrelids 
)

Definition at line 586 of file reorderbuffer.c.

587 {
588  Oid *relids;
589  Size alloc_len;
590 
591  alloc_len = sizeof(Oid) * nrelids;
592 
593  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
594 
595  return relids;
596 }

References ReorderBuffer::context, and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

◆ ReorderBufferGetTupleBuf()

HeapTuple ReorderBufferGetTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)

Definition at line 553 of file reorderbuffer.c.

554 {
555  HeapTuple tuple;
556  Size alloc_len;
557 
558  alloc_len = tuple_len + SizeofHeapTupleHeader;
559 
561  HEAPTUPLESIZE + alloc_len);
562  tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
563 
564  return tuple;
565 }
#define HEAPTUPLESIZE
Definition: htup.h:73
HeapTupleData * HeapTuple
Definition: htup.h:71
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
#define SizeofHeapTupleHeader
Definition: htup_details.h:185
HeapTupleHeader t_data
Definition: htup.h:68

References HEAPTUPLESIZE, MemoryContextAlloc(), SizeofHeapTupleHeader, HeapTupleData::t_data, and ReorderBuffer::tup_context.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

◆ ReorderBufferGetTXN()

static ReorderBufferTXN * ReorderBufferGetTXN ( ReorderBuffer rb)
static

Definition at line 405 of file reorderbuffer.c.

406 {
407  ReorderBufferTXN *txn;
408 
409  txn = (ReorderBufferTXN *)
411 
412  memset(txn, 0, sizeof(ReorderBufferTXN));
413 
414  dlist_init(&txn->changes);
415  dlist_init(&txn->tuplecids);
416  dlist_init(&txn->subtxns);
417 
418  /* InvalidCommandId is not zero, so set it explicitly */
420  txn->output_plugin_private = NULL;
421 
422  return txn;
423 }
CommandId command_id
void * output_plugin_private

References ReorderBufferTXN::changes, ReorderBufferTXN::command_id, dlist_init(), InvalidCommandId, MemoryContextAlloc(), ReorderBufferTXN::output_plugin_private, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, and ReorderBuffer::txn_context.

Referenced by ReorderBufferTXNByXid().

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 3051 of file reorderbuffer.c.

3053 {
3054  bool use_subtxn = IsTransactionOrTransactionBlock();
3055  int i;
3056 
3057  if (use_subtxn)
3058  BeginInternalSubTransaction("replay");
3059 
3060  /*
3061  * Force invalidations to happen outside of a valid transaction - that way
3062  * entries will just be marked as invalid without accessing the catalog.
3063  * That's advantageous because we don't need to setup the full state
3064  * necessary for catalog access.
3065  */
3066  if (use_subtxn)
3068 
3069  for (i = 0; i < ninvalidations; i++)
3070  LocalExecuteInvalidationMessage(&invalidations[i]);
3071 
3072  if (use_subtxn)
3074 }
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4946
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4656
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4758
void AbortCurrentTransaction(void)
Definition: xact.c:3391

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by ReorderBufferAbort(), ReorderBufferForget(), ReorderBufferInvalidate(), and xact_decode().

◆ ReorderBufferInvalidate()

void ReorderBufferInvalidate ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3020 of file reorderbuffer.c.

3021 {
3022  ReorderBufferTXN *txn;
3023 
3024  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3025  false);
3026 
3027  /* unknown, nothing to do */
3028  if (txn == NULL)
3029  return;
3030 
3031  /*
3032  * Process cache invalidation messages if there are any. Even if we're not
3033  * interested in the transaction's contents, it could have manipulated the
3034  * catalog and we need to update the caches according to that.
3035  */
3036  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3038  txn->invalidations);
3039  else
3040  Assert(txn->ninvalidations == 0);
3041 }

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodePrepare().

◆ ReorderBufferIterCompare()

static int ReorderBufferIterCompare ( Datum  a,
Datum  b,
void *  arg 
)
static

Definition at line 1222 of file reorderbuffer.c.

1223 {
1225  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
1226  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
1227 
1228  if (pos_a < pos_b)
1229  return 1;
1230  else if (pos_a == pos_b)
1231  return 0;
1232  return -1;
1233 }
void * arg
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:202
Definition: regguts.h:323

References a, arg, b, and DatumGetInt32().

Referenced by ReorderBufferIterTXNInit().

◆ ReorderBufferIterTXNFinish()

static void ReorderBufferIterTXNFinish ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1465 of file reorderbuffer.c.

1467 {
1468  int32 off;
1469 
1470  for (off = 0; off < state->nr_txns; off++)
1471  {
1472  if (state->entries[off].file.vfd != -1)
1473  FileClose(state->entries[off].file.vfd);
1474  }
1475 
1476  /* free memory we might have "leaked" in the last *Next call */
1477  if (!dlist_is_empty(&state->old_change))
1478  {
1479  ReorderBufferChange *change;
1480 
1481  change = dlist_container(ReorderBufferChange, node,
1482  dlist_pop_head_node(&state->old_change));
1483  ReorderBufferReturnChange(rb, change, true);
1484  Assert(dlist_is_empty(&state->old_change));
1485  }
1486 
1487  binaryheap_free(state->heap);
1488  pfree(state);
1489 }
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:75
void FileClose(File file)
Definition: fd.c:1978
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:450

References Assert(), binaryheap_free(), dlist_container, dlist_is_empty(), dlist_pop_head_node(), FileClose(), pfree(), and ReorderBufferReturnChange().

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferIterTXNInit()

static void ReorderBufferIterTXNInit ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferIterTXNState *volatile *  iter_state 
)
static

Definition at line 1245 of file reorderbuffer.c.

1247 {
1248  Size nr_txns = 0;
1250  dlist_iter cur_txn_i;
1251  int32 off;
1252 
1253  *iter_state = NULL;
1254 
1255  /* Check ordering of changes in the toplevel transaction. */
1256  AssertChangeLsnOrder(txn);
1257 
1258  /*
1259  * Calculate the size of our heap: one element for every transaction that
1260  * contains changes. (Besides the transactions already in the reorder
1261  * buffer, we count the one we were directly passed.)
1262  */
1263  if (txn->nentries > 0)
1264  nr_txns++;
1265 
1266  dlist_foreach(cur_txn_i, &txn->subtxns)
1267  {
1268  ReorderBufferTXN *cur_txn;
1269 
1270  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1271 
1272  /* Check ordering of changes in this subtransaction. */
1273  AssertChangeLsnOrder(cur_txn);
1274 
1275  if (cur_txn->nentries > 0)
1276  nr_txns++;
1277  }
1278 
1279  /* allocate iteration state */
1282  sizeof(ReorderBufferIterTXNState) +
1283  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1284 
1285  state->nr_txns = nr_txns;
1286  dlist_init(&state->old_change);
1287 
1288  for (off = 0; off < state->nr_txns; off++)
1289  {
1290  state->entries[off].file.vfd = -1;
1291  state->entries[off].segno = 0;
1292  }
1293 
1294  /* allocate heap */
1295  state->heap = binaryheap_allocate(state->nr_txns,
1297  state);
1298 
1299  /* Now that the state fields are initialized, it is safe to return it. */
1300  *iter_state = state;
1301 
1302  /*
1303  * Now insert items into the binary heap, in an unordered fashion. (We
1304  * will run a heap assembly step at the end; this is more efficient.)
1305  */
1306 
1307  off = 0;
1308 
1309  /* add toplevel transaction if it contains changes */
1310  if (txn->nentries > 0)
1311  {
1312  ReorderBufferChange *cur_change;
1313 
1314  if (rbtxn_is_serialized(txn))
1315  {
1316  /* serialize remaining changes */
1317  ReorderBufferSerializeTXN(rb, txn);
1318  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1319  &state->entries[off].segno);
1320  }
1321 
1322  cur_change = dlist_head_element(ReorderBufferChange, node,
1323  &txn->changes);
1324 
1325  state->entries[off].lsn = cur_change->lsn;
1326  state->entries[off].change = cur_change;
1327  state->entries[off].txn = txn;
1328 
1330  }
1331 
1332  /* add subtransactions if they contain changes */
1333  dlist_foreach(cur_txn_i, &txn->subtxns)
1334  {
1335  ReorderBufferTXN *cur_txn;
1336 
1337  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1338 
1339  if (cur_txn->nentries > 0)
1340  {
1341  ReorderBufferChange *cur_change;
1342 
1343  if (rbtxn_is_serialized(cur_txn))
1344  {
1345  /* serialize remaining changes */
1346  ReorderBufferSerializeTXN(rb, cur_txn);
1347  ReorderBufferRestoreChanges(rb, cur_txn,
1348  &state->entries[off].file,
1349  &state->entries[off].segno);
1350  }
1351  cur_change = dlist_head_element(ReorderBufferChange, node,
1352  &cur_txn->changes);
1353 
1354  state->entries[off].lsn = cur_change->lsn;
1355  state->entries[off].change = cur_change;
1356  state->entries[off].txn = cur_txn;
1357 
1359  }
1360  }
1361 
1362  /* assemble a valid binary heap */
1363  binaryheap_build(state->heap);
1364 }
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:138
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:39
void binaryheap_add_unordered(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:116
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
static void AssertChangeLsnOrder(ReorderBufferTXN *txn)
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)

References AssertChangeLsnOrder(), binaryheap_add_unordered(), binaryheap_allocate(), binaryheap_build(), ReorderBufferTXN::changes, ReorderBuffer::context, dlist_iter::cur, dlist_container, dlist_foreach, dlist_head_element, dlist_init(), Int32GetDatum(), ReorderBufferChange::lsn, MemoryContextAllocZero(), ReorderBufferTXN::nentries, rbtxn_is_serialized, ReorderBufferIterCompare(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), and ReorderBufferTXN::subtxns.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferIterTXNNext()

static ReorderBufferChange * ReorderBufferIterTXNNext ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1373 of file reorderbuffer.c.

1374 {
1375  ReorderBufferChange *change;
1377  int32 off;
1378 
1379  /* nothing there anymore */
1380  if (state->heap->bh_size == 0)
1381  return NULL;
1382 
1383  off = DatumGetInt32(binaryheap_first(state->heap));
1384  entry = &state->entries[off];
1385 
1386  /* free memory we might have "leaked" in the previous *Next call */
1387  if (!dlist_is_empty(&state->old_change))
1388  {
1389  change = dlist_container(ReorderBufferChange, node,
1390  dlist_pop_head_node(&state->old_change));
1391  ReorderBufferReturnChange(rb, change, true);
1392  Assert(dlist_is_empty(&state->old_change));
1393  }
1394 
1395  change = entry->change;
1396 
1397  /*
1398  * update heap with information about which transaction has the next
1399  * relevant change in LSN order
1400  */
1401 
1402  /* there are in-memory changes */
1403  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1404  {
1405  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1406  ReorderBufferChange *next_change =
1408 
1409  /* txn stays the same */
1410  state->entries[off].lsn = next_change->lsn;
1411  state->entries[off].change = next_change;
1412 
1414  return change;
1415  }
1416 
1417  /* try to load changes from disk */
1418  if (entry->txn->nentries != entry->txn->nentries_mem)
1419  {
1420  /*
1421  * Ugly: restoring changes will reuse *Change records, thus delete the
1422  * current one from the per-tx list and only free in the next call.
1423  */
1424  dlist_delete(&change->node);
1425  dlist_push_tail(&state->old_change, &change->node);
1426 
1427  /*
1428  * Update the total bytes processed by the txn for which we are
1429  * releasing the current set of changes and restoring the new set of
1430  * changes.
1431  */
1432  rb->totalBytes += entry->txn->size;
1433  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1434  &state->entries[off].segno))
1435  {
1436  /* successfully restored changes from disk */
1437  ReorderBufferChange *next_change =
1439  &entry->txn->changes);
1440 
1441  elog(DEBUG2, "restored %u/%u changes from disk",
1442  (uint32) entry->txn->nentries_mem,
1443  (uint32) entry->txn->nentries);
1444 
1445  Assert(entry->txn->nentries_mem);
1446  /* txn stays the same */
1447  state->entries[off].lsn = next_change->lsn;
1448  state->entries[off].change = next_change;
1450 
1451  return change;
1452  }
1453  }
1454 
1455  /* ok, no changes there anymore, remove */
1457 
1458  return change;
1459 }
void binaryheap_replace_first(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:255
bh_node_type binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:177
bh_node_type binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:192
static int32 next
Definition: blutils.c:221
unsigned int uint32
Definition: c.h:493
static bool dlist_has_next(const dlist_head *head, const dlist_node *node)
Definition: ilist.h:503
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:537
ReorderBufferChange * change
ReorderBufferTXN * txn

References Assert(), binaryheap_first(), binaryheap_remove_first(), binaryheap_replace_first(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, DatumGetInt32(), DEBUG2, dlist_container, dlist_delete(), dlist_has_next(), dlist_head_element, dlist_is_empty(), dlist_next_node(), dlist_pop_head_node(), dlist_push_tail(), elog, ReorderBufferIterTXNEntry::file, Int32GetDatum(), ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, next, ReorderBufferChange::node, ReorderBufferRestoreChanges(), ReorderBufferReturnChange(), ReorderBufferTXN::size, ReorderBuffer::totalBytes, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferLargestStreamableTopTXN()

static ReorderBufferTXN* ReorderBufferLargestStreamableTopTXN ( ReorderBuffer rb)
static

Definition at line 3530 of file reorderbuffer.c.

3531 {
3532  dlist_iter iter;
3533  Size largest_size = 0;
3534  ReorderBufferTXN *largest = NULL;
3535 
3536  /* Find the largest top-level transaction having a base snapshot. */
3538  {
3539  ReorderBufferTXN *txn;
3540 
3541  txn = dlist_container(ReorderBufferTXN, base_snapshot_node, iter.cur);
3542 
3543  /* must not be a subtxn */
3545  /* base_snapshot must be set */
3546  Assert(txn->base_snapshot != NULL);
3547 
3548  if ((largest == NULL || txn->total_size > largest_size) &&
3549  (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) &&
3551  {
3552  largest = txn;
3553  largest_size = txn->total_size;
3554  }
3555  }
3556 
3557  return largest;
3558 }
#define rbtxn_has_streamable_change(txn)
#define rbtxn_has_partial_change(txn)

References Assert(), ReorderBufferTXN::base_snapshot, dlist_iter::cur, dlist_container, dlist_foreach, rbtxn_has_partial_change, rbtxn_has_streamable_change, rbtxn_is_known_subxact, ReorderBufferTXN::total_size, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferCheckMemoryLimit().

◆ ReorderBufferLargestTXN()

static ReorderBufferTXN* ReorderBufferLargestTXN ( ReorderBuffer rb)
static

Definition at line 3482 of file reorderbuffer.c.

3483 {
3484  HASH_SEQ_STATUS hash_seq;
3486  ReorderBufferTXN *largest = NULL;
3487 
3488  hash_seq_init(&hash_seq, rb->by_txn);
3489  while ((ent = hash_seq_search(&hash_seq)) != NULL)
3490  {
3491  ReorderBufferTXN *txn = ent->txn;
3492 
3493  /* if the current transaction is larger, remember it */
3494  if ((!largest) || (txn->size > largest->size))
3495  largest = txn;
3496  }
3497 
3498  Assert(largest);
3499  Assert(largest->size > 0);
3500  Assert(largest->size <= rb->size);
3501 
3502  return largest;
3503 }
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1395
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1385
ReorderBufferTXN * txn

References Assert(), ReorderBuffer::by_txn, hash_seq_init(), hash_seq_search(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferCheckMemoryLimit().

◆ ReorderBufferPrepare()

void ReorderBufferPrepare ( ReorderBuffer rb,
TransactionId  xid,
char *  gid 
)

Definition at line 2765 of file reorderbuffer.c.

2767 {
2768  ReorderBufferTXN *txn;
2769 
2770  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2771  false);
2772 
2773  /* unknown transaction, nothing to replay */
2774  if (txn == NULL)
2775  return;
2776 
2777  txn->txn_flags |= RBTXN_PREPARE;
2778  txn->gid = pstrdup(gid);
2779 
2780  /* The prepare info must have been updated in txn by now. */
2782 
2783  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2784  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2785 
2786  /*
2787  * We send the prepare for the concurrently aborted xacts so that later
2788  * when rollback prepared is decoded and sent, the downstream should be
2789  * able to rollback such a xact. See comments atop DecodePrepare.
2790  *
2791  * Note, for the concurrent_abort + streaming case a stream_prepare was
2792  * already sent within the ReorderBufferReplay call above.
2793  */
2794  if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
2795  rb->prepare(rb, txn, txn->final_lsn);
2796 }
ReorderBufferPrepareCB prepare

References Assert(), ReorderBufferTXN::concurrent_abort, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBuffer::prepare, ReorderBufferTXN::prepare_time, pstrdup(), rbtxn_is_streamed, RBTXN_PREPARE, ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodePrepare().

◆ ReorderBufferProcessPartialChange()

static void ReorderBufferProcessPartialChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  toast_insert 
)
static

Definition at line 702 of file reorderbuffer.c.

705 {
706  ReorderBufferTXN *toptxn;
707 
708  /*
709  * The partial changes need to be processed only while streaming
710  * in-progress transactions.
711  */
712  if (!ReorderBufferCanStream(rb))
713  return;
714 
715  /* Get the top transaction. */
716  toptxn = rbtxn_get_toptxn(txn);
717 
718  /*
719  * Indicate a partial change for toast inserts. The change will be
720  * considered as complete once we get the insert or update on the main
721  * table and we are sure that the pending toast chunks are not required
722  * anymore.
723  *
724  * If we allow streaming when there are pending toast chunks then such
725  * chunks won't be released till the insert (multi_insert) is complete and
726  * we expect the txn to have streamed all changes after streaming. This
727  * restriction is mainly to ensure the correctness of streamed
728  * transactions and it doesn't seem worth uplifting such a restriction
729  * just to allow this case because anyway we will stream the transaction
730  * once such an insert is complete.
731  */
732  if (toast_insert)
734  else if (rbtxn_has_partial_change(toptxn) &&
735  IsInsertOrUpdate(change->action) &&
736  change->data.tp.clear_toast_afterwards)
738 
739  /*
740  * Indicate a partial change for speculative inserts. The change will be
741  * considered as complete once we get the speculative confirm or abort
742  * token.
743  */
744  if (IsSpecInsert(change->action))
746  else if (rbtxn_has_partial_change(toptxn) &&
747  IsSpecConfirmOrAbort(change->action))
749 
750  /*
751  * Stream the transaction if it is serialized before and the changes are
752  * now complete in the top-level transaction.
753  *
754  * The reason for doing the streaming of such a transaction as soon as we
755  * get the complete change for it is that previously it would have reached
756  * the memory threshold and wouldn't get streamed because of incomplete
757  * changes. Delaying such transactions would increase apply lag for them.
758  */
760  !(rbtxn_has_partial_change(toptxn)) &&
761  rbtxn_is_serialized(txn) &&
763  ReorderBufferStreamTXN(rb, toptxn);
764 }
#define IsSpecInsert(action)
#define IsInsertOrUpdate(action)
#define IsSpecConfirmOrAbort(action)
#define RBTXN_HAS_PARTIAL_CHANGE

References ReorderBufferChange::action, ReorderBufferChange::data, IsInsertOrUpdate, IsSpecConfirmOrAbort, IsSpecInsert, rbtxn_get_toptxn, RBTXN_HAS_PARTIAL_CHANGE, rbtxn_has_partial_change, rbtxn_has_streamable_change, rbtxn_is_serialized, ReorderBufferCanStartStreaming(), ReorderBufferCanStream(), ReorderBufferStreamTXN(), ReorderBufferChange::tp, and ReorderBufferTXN::txn_flags.

Referenced by ReorderBufferQueueChange().

◆ ReorderBufferProcessTXN()

static void ReorderBufferProcessTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn,
volatile Snapshot  snapshot_now,
volatile CommandId  command_id,
bool  streaming 
)
static

Definition at line 2065 of file reorderbuffer.c.

2070 {
2071  bool using_subtxn;
2073  ReorderBufferIterTXNState *volatile iterstate = NULL;
2074  volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
2075  ReorderBufferChange *volatile specinsert = NULL;
2076  volatile bool stream_started = false;
2077  ReorderBufferTXN *volatile curtxn = NULL;
2078 
2079  /* build data to be able to lookup the CommandIds of catalog tuples */
2081 
2082  /* setup the initial snapshot */
2083  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2084 
2085  /*
2086  * Decoding needs access to syscaches et al., which in turn use
2087  * heavyweight locks and such. Thus we need to have enough state around to
2088  * keep track of those. The easiest way is to simply use a transaction
2089  * internally. That also allows us to easily enforce that nothing writes
2090  * to the database by checking for xid assignments.
2091  *
2092  * When we're called via the SQL SRF there's already a transaction
2093  * started, so start an explicit subtransaction there.
2094  */
2095  using_subtxn = IsTransactionOrTransactionBlock();
2096 
2097  PG_TRY();
2098  {
2099  ReorderBufferChange *change;
2100  int changes_count = 0; /* used to accumulate the number of
2101  * changes */
2102 
2103  if (using_subtxn)
2104  BeginInternalSubTransaction(streaming ? "stream" : "replay");
2105  else
2107 
2108  /*
2109  * We only need to send begin/begin-prepare for non-streamed
2110  * transactions.
2111  */
2112  if (!streaming)
2113  {
2114  if (rbtxn_prepared(txn))
2115  rb->begin_prepare(rb, txn);
2116  else
2117  rb->begin(rb, txn);
2118  }
2119 
2120  ReorderBufferIterTXNInit(rb, txn, &iterstate);
2121  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
2122  {
2123  Relation relation = NULL;
2124  Oid reloid;
2125 
2127 
2128  /*
2129  * We can't call start stream callback before processing first
2130  * change.
2131  */
2132  if (prev_lsn == InvalidXLogRecPtr)
2133  {
2134  if (streaming)
2135  {
2136  txn->origin_id = change->origin_id;
2137  rb->stream_start(rb, txn, change->lsn);
2138  stream_started = true;
2139  }
2140  }
2141 
2142  /*
2143  * Enforce correct ordering of changes, merged from multiple
2144  * subtransactions. The changes may have the same LSN due to
2145  * MULTI_INSERT xlog records.
2146  */
2147  Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
2148 
2149  prev_lsn = change->lsn;
2150 
2151  /*
2152  * Set the current xid to detect concurrent aborts. This is
2153  * required for the cases when we decode the changes before the
2154  * COMMIT record is processed.
2155  */
2156  if (streaming || rbtxn_prepared(change->txn))
2157  {
2158  curtxn = change->txn;
2159  SetupCheckXidLive(curtxn->xid);
2160  }
2161 
2162  switch (change->action)
2163  {
2165 
2166  /*
2167  * Confirmation for speculative insertion arrived. Simply
2168  * use as a normal record. It'll be cleaned up at the end
2169  * of INSERT processing.
2170  */
2171  if (specinsert == NULL)
2172  elog(ERROR, "invalid ordering of speculative insertion changes");
2173  Assert(specinsert->data.tp.oldtuple == NULL);
2174  change = specinsert;
2176 
2177  /* intentionally fall through */
2181  Assert(snapshot_now);
2182 
2183  reloid = RelidByRelfilenumber(change->data.tp.rlocator.spcOid,
2184  change->data.tp.rlocator.relNumber);
2185 
2186  /*
2187  * Mapped catalog tuple without data, emitted while
2188  * catalog table was in the process of being rewritten. We
2189  * can fail to look up the relfilenumber, because the
2190  * relmapper has no "historic" view, in contrast to the
2191  * normal catalog during decoding. Thus repeated rewrites
2192  * can cause a lookup failure. That's OK because we do not
2193  * decode catalog changes anyway. Normally such tuples
2194  * would be skipped over below, but we can't identify
2195  * whether the table should be logically logged without
2196  * mapping the relfilenumber to the oid.
2197  */
2198  if (reloid == InvalidOid &&
2199  change->data.tp.newtuple == NULL &&
2200  change->data.tp.oldtuple == NULL)
2201  goto change_done;
2202  else if (reloid == InvalidOid)
2203  elog(ERROR, "could not map filenumber \"%s\" to relation OID",
2204  relpathperm(change->data.tp.rlocator,
2205  MAIN_FORKNUM));
2206 
2207  relation = RelationIdGetRelation(reloid);
2208 
2209  if (!RelationIsValid(relation))
2210  elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
2211  reloid,
2212  relpathperm(change->data.tp.rlocator,
2213  MAIN_FORKNUM));
2214 
2215  if (!RelationIsLogicallyLogged(relation))
2216  goto change_done;
2217 
2218  /*
2219  * Ignore temporary heaps created during DDL unless the
2220  * plugin has asked for them.
2221  */
2222  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2223  goto change_done;
2224 
2225  /*
2226  * For now ignore sequence changes entirely. Most of the
2227  * time they don't log changes using records we
2228  * understand, so it doesn't make sense to handle the few
2229  * cases we do.
2230  */
2231  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2232  goto change_done;
2233 
2234  /* user-triggered change */
2235  if (!IsToastRelation(relation))
2236  {
2237  ReorderBufferToastReplace(rb, txn, relation, change);
2238  ReorderBufferApplyChange(rb, txn, relation, change,
2239  streaming);
2240 
2241  /*
2242  * Only clear reassembled toast chunks if we're sure
2243  * they're not required anymore. The creator of the
2244  * tuple tells us.
2245  */
2246  if (change->data.tp.clear_toast_afterwards)
2247  ReorderBufferToastReset(rb, txn);
2248  }
2249  /* we're not interested in toast deletions */
2250  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2251  {
2252  /*
2253  * Need to reassemble the full toasted Datum in
2254  * memory, to ensure the chunks don't get reused till
2255  * we're done remove it from the list of this
2256  * transaction's changes. Otherwise it will get
2257  * freed/reused while restoring spooled data from
2258  * disk.
2259  */
2260  Assert(change->data.tp.newtuple != NULL);
2261 
2262  dlist_delete(&change->node);
2263  ReorderBufferToastAppendChunk(rb, txn, relation,
2264  change);
2265  }
2266 
2267  change_done:
2268 
2269  /*
2270  * If speculative insertion was confirmed, the record
2271  * isn't needed anymore.
2272  */
2273  if (specinsert != NULL)
2274  {
2275  ReorderBufferReturnChange(rb, specinsert, true);
2276  specinsert = NULL;
2277  }
2278 
2279  if (RelationIsValid(relation))
2280  {
2281  RelationClose(relation);
2282  relation = NULL;
2283  }
2284  break;
2285 
2287 
2288  /*
2289  * Speculative insertions are dealt with by delaying the
2290  * processing of the insert until the confirmation record
2291  * arrives. For that we simply unlink the record from the
2292  * chain, so it does not get freed/reused while restoring
2293  * spooled data from disk.
2294  *
2295  * This is safe in the face of concurrent catalog changes
2296  * because the relevant relation can't be changed between
2297  * speculative insertion and confirmation due to
2298  * CheckTableNotInUse() and locking.
2299  */
2300 
2301  /* clear out a pending (and thus failed) speculation */
2302  if (specinsert != NULL)
2303  {
2304  ReorderBufferReturnChange(rb, specinsert, true);
2305  specinsert = NULL;
2306  }
2307 
2308  /* and memorize the pending insertion */
2309  dlist_delete(&change->node);
2310  specinsert = change;
2311  break;
2312 
2314 
2315  /*
2316  * Abort for speculative insertion arrived. So cleanup the
2317  * specinsert tuple and toast hash.
2318  *
2319  * Note that we get the spec abort change for each toast
2320  * entry but we need to perform the cleanup only the first
2321  * time we get it for the main table.
2322  */
2323  if (specinsert != NULL)
2324  {
2325  /*
2326  * We must clean the toast hash before processing a
2327  * completely new tuple to avoid confusion about the
2328  * previous tuple's toast chunks.
2329  */
2330  Assert(change->data.tp.clear_toast_afterwards);
2331  ReorderBufferToastReset(rb, txn);
2332 
2333  /* We don't need this record anymore. */
2334  ReorderBufferReturnChange(rb, specinsert, true);
2335  specinsert = NULL;
2336  }
2337  break;
2338 
2340  {
2341  int i;
2342  int nrelids = change->data.truncate.nrelids;
2343  int nrelations = 0;
2344  Relation *relations;
2345 
2346  relations = palloc0(nrelids * sizeof(Relation));
2347  for (i = 0; i < nrelids; i++)
2348  {
2349  Oid relid = change->data.truncate.relids[i];
2350  Relation rel;
2351 
2352  rel = RelationIdGetRelation(relid);
2353 
2354  if (!RelationIsValid(rel))
2355  elog(ERROR, "could not open relation with OID %u", relid);
2356 
2357  if (!RelationIsLogicallyLogged(rel))
2358  continue;
2359 
2360  relations[nrelations++] = rel;
2361  }
2362 
2363  /* Apply the truncate. */
2364  ReorderBufferApplyTruncate(rb, txn, nrelations,
2365  relations, change,
2366  streaming);
2367 
2368  for (i = 0; i < nrelations; i++)
2369  RelationClose(relations[i]);
2370 
2371  break;
2372  }
2373 
2375  ReorderBufferApplyMessage(rb, txn, change, streaming);
2376  break;
2377 
2379  /* Execute the invalidation messages locally */
2380  ReorderBufferExecuteInvalidations(change->data.inval.ninvalidations,
2381  change->data.inval.invalidations);
2382  break;
2383 
2385  /* get rid of the old */
2386  TeardownHistoricSnapshot(false);
2387 
2388  if (snapshot_now->copied)
2389  {
2390  ReorderBufferFreeSnap(rb, snapshot_now);
2391  snapshot_now =
2392  ReorderBufferCopySnap(rb, change->data.snapshot,
2393  txn, command_id);
2394  }
2395 
2396  /*
2397  * Restored from disk, need to be careful not to double
2398  * free. We could introduce refcounting for that, but for
2399  * now this seems infrequent enough not to care.
2400  */
2401  else if (change->data.snapshot->copied)
2402  {
2403  snapshot_now =
2404  ReorderBufferCopySnap(rb, change->data.snapshot,
2405  txn, command_id);
2406  }
2407  else
2408  {
2409  snapshot_now = change->data.snapshot;
2410  }
2411 
2412  /* and continue with the new one */
2413  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2414  break;
2415 
2417  Assert(change->data.command_id != InvalidCommandId);
2418 
2419  if (command_id < change->data.command_id)
2420  {
2421  command_id = change->data.command_id;
2422 
2423  if (!snapshot_now->copied)
2424  {
2425  /* we don't use the global one anymore */
2426  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2427  txn, command_id);
2428  }
2429 
2430  snapshot_now->curcid = command_id;
2431 
2432  TeardownHistoricSnapshot(false);
2433  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2434  }
2435 
2436  break;
2437 
2439  elog(ERROR, "tuplecid value in changequeue");
2440  break;
2441  }
2442 
2443  /*
2444  * It is possible that the data is not sent to downstream for a
2445  * long time either because the output plugin filtered it or there
2446  * is a DDL that generates a lot of data that is not processed by
2447  * the plugin. So, in such cases, the downstream can timeout. To
2448  * avoid that we try to send a keepalive message if required.
2449  * Trying to send a keepalive message after every change has some
2450  * overhead, but testing showed there is no noticeable overhead if
2451  * we do it after every ~100 changes.
2452  */
2453 #define CHANGES_THRESHOLD 100
2454 
2455  if (++changes_count >= CHANGES_THRESHOLD)
2456  {
2457  rb->update_progress_txn(rb, txn, change->lsn);
2458  changes_count = 0;
2459  }
2460  }
2461 
2462  /* speculative insertion record must be freed by now */
2463  Assert(!specinsert);
2464 
2465  /* clean up the iterator */
2466  ReorderBufferIterTXNFinish(rb, iterstate);
2467  iterstate = NULL;
2468 
2469  /*
2470  * Update total transaction count and total bytes processed by the
2471  * transaction and its subtransactions. Ensure to not count the
2472  * streamed transaction multiple times.
2473  *
2474  * Note that the statistics computation has to be done after
2475  * ReorderBufferIterTXNFinish as it releases the serialized change
2476  * which we have already accounted in ReorderBufferIterTXNNext.
2477  */
2478  if (!rbtxn_is_streamed(txn))
2479  rb->totalTxns++;
2480 
2481  rb->totalBytes += txn->total_size;
2482 
2483  /*
2484  * Done with current changes, send the last message for this set of
2485  * changes depending upon streaming mode.
2486  */
2487  if (streaming)
2488  {
2489  if (stream_started)
2490  {
2491  rb->stream_stop(rb, txn, prev_lsn);
2492  stream_started = false;
2493  }
2494  }
2495  else
2496  {
2497  /*
2498  * Call either PREPARE (for two-phase transactions) or COMMIT (for
2499  * regular ones).
2500  */
2501  if (rbtxn_prepared(txn))
2502  rb->prepare(rb, txn, commit_lsn);
2503  else
2504  rb->commit(rb, txn, commit_lsn);
2505  }
2506 
2507  /* this is just a sanity check against bad output plugin behaviour */
2509  elog(ERROR, "output plugin used XID %u",
2511 
2512  /*
2513  * Remember the command ID and snapshot for the next set of changes in
2514  * streaming mode.
2515  */
2516  if (streaming)
2517  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2518  else if (snapshot_now->copied)
2519  ReorderBufferFreeSnap(rb, snapshot_now);
2520 
2521  /* cleanup */
2522  TeardownHistoricSnapshot(false);
2523 
2524  /*
2525  * Aborting the current (sub-)transaction as a whole has the right
2526  * semantics. We want all locks acquired in here to be released, not
2527  * reassigned to the parent and we do not want any database access
2528  * have persistent effects.
2529  */
2531 
2532  /* make sure there's no cache pollution */
2534 
2535  if (using_subtxn)
2537 
2538  /*
2539  * We are here due to one of the four reasons: 1. Decoding an
2540  * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
2541  * prepared txn that was (partially) streamed. 4. Decoding a committed
2542  * txn.
2543  *
2544  * For 1, we allow truncation of txn data by removing the changes
2545  * already streamed but still keeping other things like invalidations,
2546  * snapshot, and tuplecids. For 2 and 3, we indicate
2547  * ReorderBufferTruncateTXN to do more elaborate truncation of txn
2548  * data as the entire transaction has been decoded except for commit.
2549  * For 4, as the entire txn has been decoded, we can fully clean up
2550  * the TXN reorder buffer.
2551  */
2552  if (streaming || rbtxn_prepared(txn))
2553  {
2555  /* Reset the CheckXidAlive */
2557  }
2558  else
2559  ReorderBufferCleanupTXN(rb, txn);
2560  }
2561  PG_CATCH();
2562  {
2563  MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
2564  ErrorData *errdata = CopyErrorData();
2565 
2566  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
2567  if (iterstate)
2568  ReorderBufferIterTXNFinish(rb, iterstate);
2569 
2571 
2572  /*
2573  * Force cache invalidation to happen outside of a valid transaction
2574  * to prevent catalog access as we just caught an error.
2575  */
2577 
2578  /* make sure there's no cache pollution */
2580  txn->invalidations);
2581 
2582  if (using_subtxn)
2584 
2585  /*
2586  * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2587  * abort of the (sub)transaction we are streaming or preparing. We
2588  * need to do the cleanup and return gracefully on this error, see
2589  * SetupCheckXidLive.
2590  *
2591  * This error code can be thrown by one of the callbacks we call
2592  * during decoding so we need to ensure that we return gracefully only
2593  * when we are sending the data in streaming mode and the streaming is
2594  * not finished yet or when we are sending the data out on a PREPARE
2595  * during a two-phase commit.
2596  */
2597  if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2598  (stream_started || rbtxn_prepared(txn)))
2599  {
2600  /* curtxn must be set for streaming or prepared transactions */
2601  Assert(curtxn);
2602 
2603  /* Cleanup the temporary error state. */
2604  FlushErrorState();
2605  FreeErrorData(errdata);
2606  errdata = NULL;
2607  curtxn->concurrent_abort = true;
2608 
2609  /* Reset the TXN so that it is allowed to stream remaining data. */
2610  ReorderBufferResetTXN(rb, txn, snapshot_now,
2611  command_id, prev_lsn,
2612  specinsert);
2613  }
2614  else
2615  {
2616  ReorderBufferCleanupTXN(rb, txn);
2617  MemoryContextSwitchTo(ecxt);
2618  PG_RE_THROW();
2619  }
2620  }
2621  PG_END_TRY();
2622 }
bool IsToastRelation(Relation relation)
Definition: catalog.c:145
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1779
void FlushErrorState(void)
Definition: elog.c:1828
ErrorData * CopyErrorData(void)
Definition: elog.c:1723
#define PG_RE_THROW()
Definition: elog.h:411
#define PG_TRY(...)
Definition: elog.h:370
#define PG_END_TRY(...)
Definition: elog.h:395
#define PG_CATCH(...)
Definition: elog.h:380
void * palloc0(Size size)
Definition: mcxt.c:1334
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
const void * data
#define InvalidOid
Definition: postgres_ext.h:36
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:703
#define RelationIsValid(relation)
Definition: rel.h:480
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2062
void RelationClose(Relation relation)
Definition: relcache.c:2193
Oid RelidByRelfilenumber(Oid reltablespace, RelFileNumber relfilenumber)
@ MAIN_FORKNUM
Definition: relpath.h:50
#define relpathperm(rlocator, forknum)
Definition: relpath.h:90
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
static void ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
#define CHANGES_THRESHOLD
static void ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
static void SetupCheckXidLive(TransactionId xid)
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
static void ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
#define rbtxn_prepared(txn)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:1665
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:1649
int sqlerrcode
Definition: elog.h:438
Form_pg_class rd_rel
Definition: rel.h:111
RepOriginId origin_id
Definition: reorderbuffer.h:80
ReorderBufferBeginCB begin_prepare
ReorderBufferUpdateProgressTxnCB update_progress_txn
ReorderBufferStreamStopCB stream_stop
ReorderBufferCommitCB commit
ReorderBufferStreamStartCB stream_start
ReorderBufferBeginCB begin
TransactionId CheckXidAlive
Definition: xact.c:97
void StartTransactionCommand(void)
Definition: xact.c:2995
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:468
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:451

References AbortCurrentTransaction(), ReorderBufferChange::action, Assert(), ReorderBuffer::begin, ReorderBuffer::begin_prepare, BeginInternalSubTransaction(), CHANGES_THRESHOLD, CHECK_FOR_INTERRUPTS, CheckXidAlive, ReorderBufferChange::command_id, ReorderBuffer::commit, ReorderBufferTXN::concurrent_abort, SnapshotData::copied, CopyErrorData(), SnapshotData::curcid, CurrentMemoryContext, ReorderBufferChange::data, data, dlist_delete(), elog, ERROR, FlushErrorState(), FreeErrorData(), GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), i, ReorderBufferChange::inval, ReorderBufferTXN::invalidations, InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, ReorderBufferChange::node, ReorderBufferChange::origin_id, ReorderBufferTXN::origin_id, ReorderBuffer::output_rewrites, palloc0(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, ReorderBuffer::prepare, rbtxn_is_streamed, rbtxn_prepared, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelationIsValid, RelidByRelfilenumber(), relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferApplyChange(), ReorderBufferApplyMessage(), ReorderBufferApplyTruncate(), ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferResetTXN(), ReorderBufferReturnChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), RollbackAndReleaseCurrentSubTransaction(), SetupCheckXidLive(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, ErrorData::sqlerrcode, StartTransactionCommand(), ReorderBuffer::stream_start, ReorderBuffer::stream_stop, TeardownHistoricSnapshot(), ReorderBufferTXN::total_size, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTXN::tuplecid_hash, ReorderBufferChange::txn, ReorderBuffer::update_progress_txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferReplay(), and ReorderBufferStreamTXN().

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3087 of file reorderbuffer.c.

3088 {
3089  /* many records won't have an xid assigned, centralize check here */
3090  if (xid != InvalidTransactionId)
3091  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3092 }

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by heap2_decode(), heap_decode(), LogicalDecodingProcessRecord(), logicalmsg_decode(), standby_decode(), xact_decode(), and xlog_decode().

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change,
bool  toast_insert 
)

Definition at line 771 of file reorderbuffer.c.

773 {
774  ReorderBufferTXN *txn;
775 
776  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
777 
778  /*
779  * While streaming the previous changes we have detected that the
780  * transaction is aborted. So there is no point in collecting further
781  * changes for it.
782  */
783  if (txn->concurrent_abort)
784  {
785  /*
786  * We don't need to update memory accounting for this change as we
787  * have not added it to the queue yet.
788  */
789  ReorderBufferReturnChange(rb, change, false);
790  return;
791  }
792 
793  /*
794  * The changes that are sent downstream are considered streamable. We
795  * remember such transactions so that only those will later be considered
796  * for streaming.
797  */
798  if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
804  {
805  ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
806 
808  }
809 
810  change->lsn = lsn;
811  change->txn = txn;
812 
813  Assert(InvalidXLogRecPtr != lsn);
814  dlist_push_tail(&txn->changes, &change->node);
815  txn->nentries++;
816  txn->nentries_mem++;
817 
818  /* update memory accounting information */
819  ReorderBufferChangeMemoryUpdate(rb, change, true,
820  ReorderBufferChangeSize(change));
821 
822  /* process partial change */
823  ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
824 
825  /* check the memory limits and evict something if needed */
827 }
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition, Size sz)
#define RBTXN_HAS_STREAMABLE_CHANGE

References ReorderBufferChange::action, Assert(), ReorderBufferTXN::changes, ReorderBufferTXN::concurrent_abort, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, rbtxn_get_toptxn, RBTXN_HAS_STREAMABLE_CHANGE, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferCheckMemoryLimit(), ReorderBufferProcessPartialChange(), ReorderBufferReturnChange(), ReorderBufferTXNByXid(), ReorderBufferChange::txn, and ReorderBufferTXN::txn_flags.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddInvalidations(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snap,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 834 of file reorderbuffer.c.

838 {
839  if (transactional)
840  {
841  MemoryContext oldcontext;
842  ReorderBufferChange *change;
843 
845 
846  /*
847  * We don't expect snapshots for transactional changes - we'll use the
848  * snapshot derived later during apply (unless the change gets
849  * skipped).
850  */
851  Assert(!snap);
852 
853  oldcontext = MemoryContextSwitchTo(rb->context);
854 
855  change = ReorderBufferGetChange(rb);
857  change->data.msg.prefix = pstrdup(prefix);
858  change->data.msg.message_size = message_size;
859  change->data.msg.message = palloc(message_size);
860  memcpy(change->data.msg.message, message, message_size);
861 
862  ReorderBufferQueueChange(rb, xid, lsn, change, false);
863 
864  MemoryContextSwitchTo(oldcontext);
865  }
866  else
867  {
868  ReorderBufferTXN *txn = NULL;
869  volatile Snapshot snapshot_now = snap;
870 
871  /* Non-transactional changes require a valid snapshot. */
872  Assert(snapshot_now);
873 
874  if (xid != InvalidTransactionId)
875  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
876 
877  /* setup snapshot to allow catalog access */
878  SetupHistoricSnapshot(snapshot_now, NULL);
879  PG_TRY();
880  {
881  rb->message(rb, txn, lsn, false, prefix, message_size, message);
882 
884  }
885  PG_CATCH();
886  {
888  PG_RE_THROW();
889  }
890  PG_END_TRY();
891  }
892 }

References ReorderBufferChange::action, Assert(), ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), and TeardownHistoricSnapshot().

Referenced by logicalmsg_decode().

◆ ReorderBufferRememberPrepareInfo()

bool ReorderBufferRememberPrepareInfo ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  prepare_lsn,
XLogRecPtr  end_lsn,
TimestampTz  prepare_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2718 of file reorderbuffer.c.

2722 {
2723  ReorderBufferTXN *txn;
2724 
2725  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2726 
2727  /* unknown transaction, nothing to do */
2728  if (txn == NULL)
2729  return false;
2730 
2731  /*
2732  * Remember the prepare information to be later used by commit prepared in
2733  * case we skip doing prepare.
2734  */
2735  txn->final_lsn = prepare_lsn;
2736  txn->end_lsn = end_lsn;
2737  txn->xact_time.prepare_time = prepare_time;
2738  txn->origin_id = origin_id;
2739  txn->origin_lsn = origin_lsn;
2740 
2741  return true;
2742 }

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, ReorderBufferTXNByXid(), and ReorderBufferTXN::xact_time.

Referenced by DecodePrepare().

◆ ReorderBufferReplay()

static void ReorderBufferReplay ( ReorderBufferTXN txn,
ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)
static

Definition at line 2635 of file reorderbuffer.c.

2640 {
2641  Snapshot snapshot_now;
2642  CommandId command_id = FirstCommandId;
2643 
2644  txn->final_lsn = commit_lsn;
2645  txn->end_lsn = end_lsn;
2646  txn->xact_time.commit_time = commit_time;
2647  txn->origin_id = origin_id;
2648  txn->origin_lsn = origin_lsn;
2649 
2650  /*
2651  * If the transaction was (partially) streamed, we need to commit it in a
2652  * 'streamed' way. That is, we first stream the remaining part of the
2653  * transaction, and then invoke stream_commit message.
2654  *
2655  * Called after everything (origin ID, LSN, ...) is stored in the
2656  * transaction to avoid passing that information directly.
2657  */
2658  if (rbtxn_is_streamed(txn))
2659  {
2660  ReorderBufferStreamCommit(rb, txn);
2661  return;
2662  }
2663 
2664  /*
2665  * If this transaction has no snapshot, it didn't make any changes to the
2666  * database, so there's nothing to decode. Note that
2667  * ReorderBufferCommitChild will have transferred any snapshots from
2668  * subtransactions if there were any.
2669  */
2670  if (txn->base_snapshot == NULL)
2671  {
2672  Assert(txn->ninvalidations == 0);
2673 
2674  /*
2675  * Removing this txn before a commit might result in the computation
2676  * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
2677  */
2678  if (!rbtxn_prepared(txn))
2679  ReorderBufferCleanupTXN(rb, txn);
2680  return;
2681  }
2682 
2683  snapshot_now = txn->base_snapshot;
2684 
2685  /* Process and send the changes to output plugin. */
2686  ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2687  command_id, false);
2688 }
#define FirstCommandId
Definition: c.h:655
uint32 CommandId
Definition: c.h:653
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, FirstCommandId, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, rbtxn_is_streamed, rbtxn_prepared, ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), ReorderBufferStreamCommit(), and ReorderBufferTXN::xact_time.

Referenced by ReorderBufferCommit(), ReorderBufferFinishPrepared(), and ReorderBufferPrepare().

◆ ReorderBufferResetTXN()

static void ReorderBufferResetTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id,
XLogRecPtr  last_lsn,
ReorderBufferChange specinsert 
)
static

Definition at line 2022 of file reorderbuffer.c.

2027 {
2028  /* Discard the changes that we just streamed */
2030 
2031  /* Free all resources allocated for toast reconstruction */
2032  ReorderBufferToastReset(rb, txn);
2033 
2034  /* Return the spec insert change if it is not NULL */
2035  if (specinsert != NULL)
2036  {
2037  ReorderBufferReturnChange(rb, specinsert, true);
2038  specinsert = NULL;
2039  }
2040 
2041  /*
2042  * For the streaming case, stop the stream and remember the command ID and
2043  * snapshot for the streaming run.
2044  */
2045  if (rbtxn_is_streamed(txn))
2046  {
2047  rb->stream_stop(rb, txn, last_lsn);
2048  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2049  }
2050 }

References rbtxn_is_streamed, rbtxn_prepared, ReorderBufferReturnChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), and ReorderBuffer::stream_stop.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferRestoreChange()

static void ReorderBufferRestoreChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
char *  data 
)
static

Definition at line 4333 of file reorderbuffer.c.

4335 {
4336  ReorderBufferDiskChange *ondisk;
4337  ReorderBufferChange *change;
4338 
4339  ondisk = (ReorderBufferDiskChange *) data;
4340 
4341  change = ReorderBufferGetChange(rb);
4342 
4343  /* copy static part */
4344  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
4345 
4346  data += sizeof(ReorderBufferDiskChange);
4347 
4348  /* restore individual stuff */
4349  switch (change->action)
4350  {
4351  /* fall through these, they're all similar enough */
4356  if (change->data.tp.oldtuple)
4357  {
4358  uint32 tuplelen = ((HeapTuple) data)->t_len;
4359 
4360  change->data.tp.oldtuple =
4362 
4363  /* restore ->tuple */
4364  memcpy(change->data.tp.oldtuple, data,
4365  sizeof(HeapTupleData));
4366  data += sizeof(HeapTupleData);
4367 
4368  /* reset t_data pointer into the new tuplebuf */
4369  change->data.tp.oldtuple->t_data =
4370  (HeapTupleHeader) ((char *) change->data.tp.oldtuple + HEAPTUPLESIZE);
4371 
4372  /* restore tuple data itself */
4373  memcpy(change->data.tp.oldtuple->t_data, data, tuplelen);
4374  data += tuplelen;
4375  }
4376 
4377  if (change->data.tp.newtuple)
4378  {
4379  /* here, data might not be suitably aligned! */
4380  uint32 tuplelen;
4381 
4382  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
4383  sizeof(uint32));
4384 
4385  change->data.tp.newtuple =
4387 
4388  /* restore ->tuple */
4389  memcpy(change->data.tp.newtuple, data,
4390  sizeof(HeapTupleData));
4391  data += sizeof(HeapTupleData);
4392 
4393  /* reset t_data pointer into the new tuplebuf */
4394  change->data.tp.newtuple->t_data =
4395  (HeapTupleHeader) ((char *) change->data.tp.newtuple + HEAPTUPLESIZE);
4396 
4397  /* restore tuple data itself */
4398  memcpy(change->data.tp.newtuple->t_data, data, tuplelen);
4399  data += tuplelen;
4400  }
4401 
4402  break;
4404  {
4405  Size prefix_size;
4406 
4407  /* read prefix */
4408  memcpy(&prefix_size, data, sizeof(Size));
4409  data += sizeof(Size);
4410  change->data.msg.prefix = MemoryContextAlloc(rb->context,
4411  prefix_size);
4412  memcpy(change->data.msg.prefix, data, prefix_size);
4413  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
4414  data += prefix_size;
4415 
4416  /* read the message */
4417  memcpy(&change->data.msg.message_size, data, sizeof(Size));
4418  data += sizeof(Size);
4419  change->data.msg.message = MemoryContextAlloc(rb->context,
4420  change->data.msg.message_size);
4421  memcpy(change->data.msg.message, data,
4422  change->data.msg.message_size);
4423  data += change->data.msg.message_size;
4424 
4425  break;
4426  }
4428  {
4429  Size inval_size = sizeof(SharedInvalidationMessage) *
4430  change->data.inval.ninvalidations;
4431 
4432  change->data.inval.invalidations =
4433  MemoryContextAlloc(rb->context, inval_size);
4434 
4435  /* read the message */
4436  memcpy(change->data.inval.invalidations, data, inval_size);
4437 
4438  break;
4439  }
4441  {
4442  Snapshot oldsnap;
4443  Snapshot newsnap;
4444  Size size;
4445 
4446  oldsnap = (Snapshot) data;
4447 
4448  size = sizeof(SnapshotData) +
4449  sizeof(TransactionId) * oldsnap->xcnt +
4450  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
4451 
4453 
4454  newsnap = change->data.snapshot;
4455 
4456  memcpy(newsnap, data, size);
4457  newsnap->xip = (TransactionId *)
4458  (((char *) newsnap) + sizeof(SnapshotData));
4459  newsnap->subxip = newsnap->xip + newsnap->xcnt;
4460  newsnap->copied = true;
4461  break;
4462  }
4463  /* the base struct contains all the data, easy peasy */
4465  {
4466  Oid *relids;
4467 
4468  relids = ReorderBufferGetRelids(rb,
4469  change->data.truncate.nrelids);
4470  memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
4471  change->data.truncate.relids = relids;
4472 
4473  break;
4474  }
4479  break;
4480  }
4481 
4482  dlist_push_tail(&txn->changes, &change->node);
4483  txn->nentries_mem++;
4484 
4485  /*
4486  * Update memory accounting for the restored change. We need to do this
4487  * although we don't check the memory limit when restoring the changes in
4488  * this branch (we only do that when initially queueing the changes after
4489  * decoding), because we will release the changes later, and that will
4490  * update the accounting too (subtracting the size from the counters). And
4491  * we don't want to underflow there.
4492  */
4493  ReorderBufferChangeMemoryUpdate(rb, change, true,
4494  ReorderBufferChangeSize(change));
4495 }
struct ReorderBufferDiskChange ReorderBufferDiskChange
HeapTuple ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
Oid * ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
struct SnapshotData * Snapshot
Definition: snapshot.h:121
ReorderBufferChange change

References ReorderBufferChange::action, Assert(), ReorderBufferDiskChange::change, ReorderBufferTXN::changes, ReorderBuffer::context, SnapshotData::copied, ReorderBufferChange::data, data, dlist_push_tail(), HEAPTUPLESIZE, ReorderBufferChange::inval, MemoryContextAlloc(), MemoryContextAllocZero(), ReorderBufferChange::msg, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferGetChange(), ReorderBufferGetRelids(), ReorderBufferGetTupleBuf(), size, SizeofHeapTupleHeader, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, ReorderBufferChange::tp, ReorderBufferChange::truncate, SnapshotData::xcnt, and SnapshotData::xip.

Referenced by ReorderBufferRestoreChanges().

◆ ReorderBufferRestoreChanges()

static Size ReorderBufferRestoreChanges ( ReorderBuffer rb,
ReorderBufferTXN txn,
TXNEntryFile file,
XLogSegNo segno 
)
static

Definition at line 4190 of file reorderbuffer.c.

4192 {
4193  Size restored = 0;
4194  XLogSegNo last_segno;
4195  dlist_mutable_iter cleanup_iter;
4196  File *fd = &file->vfd;
4197 
4200 
4201  /* free current entries, so we have memory for more */
4202  dlist_foreach_modify(cleanup_iter, &txn->changes)
4203  {
4205  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
4206 
4207  dlist_delete(&cleanup->node);
4209  }
4210  txn->nentries_mem = 0;
4211  Assert(dlist_is_empty(&txn->changes));
4212 
4213  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
4214 
4215  while (restored < max_changes_in_memory && *segno <= last_segno)
4216  {
4217  int readBytes;
4218  ReorderBufferDiskChange *ondisk;
4219 
4221 
4222  if (*fd == -1)
4223  {
4224  char path[MAXPGPATH];
4225 
4226  /* first time in */
4227  if (*segno == 0)
4228  XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
4229 
4230  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
4231 
4232  /*
4233  * No need to care about TLIs here, only used during a single run,
4234  * so each LSN only maps to a specific WAL record.
4235  */
4237  *segno);
4238 
4239  *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
4240 
4241  /* No harm in resetting the offset even in case of failure */
4242  file->curOffset = 0;
4243 
4244  if (*fd < 0 && errno == ENOENT)
4245  {
4246  *fd = -1;
4247  (*segno)++;
4248  continue;
4249  }
4250  else if (*fd < 0)
4251  ereport(ERROR,
4253  errmsg("could not open file \"%s\": %m",
4254  path)));
4255  }
4256 
4257  /*
4258  * Read the statically sized part of a change which has information
4259  * about the total size. If we couldn't read a record, we're at the
4260  * end of this file.
4261  */
4263  readBytes = FileRead(file->vfd, rb->outbuf,
4264  sizeof(ReorderBufferDiskChange),
4265  file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
4266 
4267  /* eof */
4268  if (readBytes == 0)
4269  {
4270  FileClose(*fd);
4271  *fd = -1;
4272  (*segno)++;
4273  continue;
4274  }
4275  else if (readBytes < 0)
4276  ereport(ERROR,
4278  errmsg("could not read from reorderbuffer spill file: %m")));
4279  else if (readBytes != sizeof(ReorderBufferDiskChange))
4280  ereport(ERROR,
4282  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4283  readBytes,
4284  (uint32) sizeof(ReorderBufferDiskChange))));
4285 
4286  file->curOffset += readBytes;
4287 
4288  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4289 
4291  sizeof(ReorderBufferDiskChange) + ondisk->size);
4292  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4293 
4294  readBytes = FileRead(file->vfd,
4295  rb->outbuf + sizeof(ReorderBufferDiskChange),
4296  ondisk->size - sizeof(ReorderBufferDiskChange),
4297  file->curOffset,
4298  WAIT_EVENT_REORDER_BUFFER_READ);
4299 
4300  if (readBytes < 0)
4301  ereport(ERROR,
4303  errmsg("could not read from reorderbuffer spill file: %m")));
4304  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
4305  ereport(ERROR,
4307  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4308  readBytes,
4309  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
4310 
4311  file->curOffset += readBytes;
4312 
4313  /*
4314  * ok, read a full change from disk, now restore it into proper
4315  * in-memory format
4316  */
4317  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
4318  restored++;
4319  }
4320 
4321  return restored;
4322 }
static void cleanup(void)
Definition: bootstrap.c:682
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1575
static ssize_t FileRead(File file, void *buffer, size_t amount, off_t offset, uint32 wait_event_info)
Definition: fd.h:196
int File
Definition: fd.h:51
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *data)
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
static const Size max_changes_in_memory
int wal_segment_size
Definition: xlog.c:143
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition: xlogdefs.h:48

References Assert(), ReorderBufferTXN::changes, CHECK_FOR_INTERRUPTS, cleanup(), dlist_mutable_iter::cur, TXNEntryFile::curOffset, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FileClose(), FileRead(), ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, max_changes_in_memory, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBuffer::outbuf, PathNameOpenFile(), PG_BINARY, ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferSerializedPath(), ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, TXNEntryFile::vfd, wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferIterTXNInit(), and ReorderBufferIterTXNNext().

◆ ReorderBufferRestoreCleanup()

static void ReorderBufferRestoreCleanup ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 4501 of file reorderbuffer.c.

4502 {
4503  XLogSegNo first;
4504  XLogSegNo cur;
4505  XLogSegNo last;
4506 
4509 
4510  XLByteToSeg(txn->first_lsn, first, wal_segment_size);
4511  XLByteToSeg(txn->final_lsn, last, wal_segment_size);
4512 
4513  /* iterate over all possible filenames, and delete them */
4514  for (cur = first; cur <= last; cur++)
4515  {
4516  char path[MAXPGPATH];
4517 
4519  if (unlink(path) != 0 && errno != ENOENT)
4520  ereport(ERROR,
4522  errmsg("could not remove file \"%s\": %m", path)));
4523  }
4524 }
struct cursor * cur
Definition: ecpg.c:28

References Assert(), cur, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, MAXPGPATH, MyReplicationSlot, ReorderBufferSerializedPath(), wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferCleanupTXN(), and ReorderBufferTruncateTXN().

◆ ReorderBufferReturnChange()

void ReorderBufferReturnChange ( ReorderBuffer rb,
ReorderBufferChange change,
bool  upd_mem 
)

Definition at line 483 of file reorderbuffer.c.

485 {
486  /* update memory accounting info */
487  if (upd_mem)
488  ReorderBufferChangeMemoryUpdate(rb, change, false,
489  ReorderBufferChangeSize(change));
490 
491  /* free contained data */
492  switch (change->action)
493  {
498  if (change->data.tp.newtuple)
499  {
500  ReorderBufferReturnTupleBuf(change->data.tp.newtuple);
501  change->data.tp.newtuple = NULL;
502  }
503 
504  if (change->data.tp.oldtuple)
505  {
506  ReorderBufferReturnTupleBuf(change->data.tp.oldtuple);
507  change->data.tp.oldtuple = NULL;
508  }
509  break;
511  if (change->data.msg.prefix != NULL)
512  pfree(change->data.msg.prefix);
513  change->data.msg.prefix = NULL;
514  if (change->data.msg.message != NULL)
515  pfree(change->data.msg.message);
516  change->data.msg.message = NULL;
517  break;
519  if (change->data.inval.invalidations)
520  pfree(change->data.inval.invalidations);
521  change->data.inval.invalidations = NULL;
522  break;
524  if (change->data.snapshot)
525  {
526  ReorderBufferFreeSnap(rb, change->data.snapshot);
527  change->data.snapshot = NULL;
528  }
529  break;
530  /* no data in addition to the struct itself */
532  if (change->data.truncate.relids != NULL)
533  {
534  ReorderBufferReturnRelids(rb, change->data.truncate.relids);
535  change->data.truncate.relids = NULL;
536  }
537  break;
542  break;
543  }
544 
545  pfree(change);
546 }
void ReorderBufferReturnTupleBuf(HeapTuple tuple)
void ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::msg, pfree(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferFreeSnap(), ReorderBufferReturnRelids(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, ReorderBufferChange::tp, and ReorderBufferChange::truncate.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferProcessTXN(), ReorderBufferQueueChange(), ReorderBufferResetTXN(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferToastReset(), and ReorderBufferTruncateTXN().

◆ ReorderBufferReturnRelids()

void ReorderBufferReturnRelids ( ReorderBuffer rb,
Oid relids 
)

Definition at line 602 of file reorderbuffer.c.

603 {
604  pfree(relids);
605 }

References pfree().

Referenced by ReorderBufferReturnChange().

◆ ReorderBufferReturnTupleBuf()

void ReorderBufferReturnTupleBuf ( HeapTuple  tuple)

Definition at line 571 of file reorderbuffer.c.

572 {
573  pfree(tuple);
574 }

References pfree().

Referenced by ReorderBufferReturnChange().

◆ ReorderBufferReturnTXN()

static void ReorderBufferReturnTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 429 of file reorderbuffer.c.

430 {
431  /* clean the lookup cache if we were cached (quite likely) */
432  if (rb->by_txn_last_xid == txn->xid)
433  {
435  rb->by_txn_last_txn = NULL;
436  }
437 
438  /* free data that's contained */
439 
440  if (txn->gid != NULL)
441  {
442  pfree(txn->gid);
443  txn->gid = NULL;
444  }
445 
446  if (txn->tuplecid_hash != NULL)
447  {
449  txn->tuplecid_hash = NULL;
450  }
451 
452  if (txn->invalidations)
453  {
454  pfree(txn->invalidations);
455  txn->invalidations = NULL;
456  }
457 
458  /* Reset the toast hash */
459  ReorderBufferToastReset(rb, txn);
460 
461  pfree(txn);
462 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:865

References ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBufferTXN::gid, hash_destroy(), ReorderBufferTXN::invalidations, InvalidTransactionId, pfree(), ReorderBufferToastReset(), ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::xid.

Referenced by ReorderBufferCleanupTXN().

◆ ReorderBufferSaveTXNSnapshot()

static void ReorderBufferSaveTXNSnapshot ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id 
)
inlinestatic

Definition at line 2001 of file reorderbuffer.c.

2003 {
2004  txn->command_id = command_id;
2005 
2006  /* Avoid copying if it's already copied. */
2007  if (snapshot_now->copied)
2008  txn->snapshot_now = snapshot_now;
2009  else
2010  txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2011  txn, command_id);
2012 }

References ReorderBufferTXN::command_id, SnapshotData::copied, ReorderBufferCopySnap(), and ReorderBufferTXN::snapshot_now.

Referenced by ReorderBufferProcessTXN(), and ReorderBufferResetTXN().

◆ ReorderBufferSerializeChange()

static void ReorderBufferSerializeChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  fd,
ReorderBufferChange change 
)
static

Definition at line 3738 of file reorderbuffer.c.

3740 {
3741  ReorderBufferDiskChange *ondisk;
3742  Size sz = sizeof(ReorderBufferDiskChange);
3743 
3745 
3746  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3747  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
3748 
3749  switch (change->action)
3750  {
3751  /* fall through these, they're all similar enough */
3756  {
3757  char *data;
3758  HeapTuple oldtup,
3759  newtup;
3760  Size oldlen = 0;
3761  Size newlen = 0;
3762 
3763  oldtup = change->data.tp.oldtuple;
3764  newtup = change->data.tp.newtuple;
3765 
3766  if (oldtup)
3767  {
3768  sz += sizeof(HeapTupleData);
3769  oldlen = oldtup->t_len;
3770  sz += oldlen;
3771  }
3772 
3773  if (newtup)
3774  {
3775  sz += sizeof(HeapTupleData);
3776  newlen = newtup->t_len;
3777  sz += newlen;
3778  }
3779 
3780  /* make sure we have enough space */
3782 
3783  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3784  /* might have been reallocated above */
3785  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3786 
3787  if (oldlen)
3788  {
3789  memcpy(data, oldtup, sizeof(HeapTupleData));
3790  data += sizeof(HeapTupleData);
3791 
3792  memcpy(data, oldtup->t_data, oldlen);
3793  data += oldlen;
3794  }
3795 
3796  if (newlen)
3797  {
3798  memcpy(data, newtup, sizeof(HeapTupleData));
3799  data += sizeof(HeapTupleData);
3800 
3801  memcpy(data, newtup->t_data, newlen);
3802  data += newlen;
3803  }
3804  break;
3805  }
3807  {
3808  char *data;
3809  Size prefix_size = strlen(change->data.msg.prefix) + 1;
3810 
3811  sz += prefix_size + change->data.msg.message_size +
3812  sizeof(Size) + sizeof(Size);
3814 
3815  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3816 
3817  /* might have been reallocated above */
3818  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3819 
3820  /* write the prefix including the size */
3821  memcpy(data, &prefix_size, sizeof(Size));
3822  data += sizeof(Size);
3823  memcpy(data, change->data.msg.prefix,
3824  prefix_size);
3825  data += prefix_size;
3826 
3827  /* write the message including the size */
3828  memcpy(data, &change->data.msg.message_size, sizeof(Size));
3829  data += sizeof(Size);
3830  memcpy(data, change->data.msg.message,
3831  change->data.msg.message_size);
3832  data += change->data.msg.message_size;
3833 
3834  break;
3835  }
3837  {
3838  char *data;
3839  Size inval_size = sizeof(SharedInvalidationMessage) *
3840  change->data.inval.ninvalidations;
3841 
3842  sz += inval_size;
3843 
3845  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3846 
3847  /* might have been reallocated above */
3848  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3849  memcpy(data, change->data.inval.invalidations, inval_size);
3850  data += inval_size;
3851 
3852  break;
3853  }
3855  {
3856  Snapshot snap;
3857  char *data;
3858 
3859  snap = change->data.snapshot;
3860 
3861  sz += sizeof(SnapshotData) +
3862  sizeof(TransactionId) * snap->xcnt +
3863  sizeof(TransactionId) * snap->subxcnt;
3864 
3865  /* make sure we have enough space */
3867  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3868  /* might have been reallocated above */
3869  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3870 
3871  memcpy(data, snap, sizeof(SnapshotData));
3872  data += sizeof(SnapshotData);
3873 
3874  if (snap->xcnt)
3875  {
3876  memcpy(data, snap->xip,
3877  sizeof(TransactionId) * snap->xcnt);
3878  data += sizeof(TransactionId) * snap->xcnt;
3879  }
3880 
3881  if (snap->subxcnt)
3882  {
3883  memcpy(data, snap->subxip,
3884  sizeof(TransactionId) * snap->subxcnt);
3885  data += sizeof(TransactionId) * snap->subxcnt;
3886  }
3887  break;
3888  }
3890  {
3891  Size size;
3892  char *data;
3893 
3894  /* account for the OIDs of truncated relations */
3895  size = sizeof(Oid) * change->data.truncate.nrelids;
3896  sz += size;
3897 
3898  /* make sure we have enough space */
3900 
3901  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3902  /* might have been reallocated above */
3903  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3904 
3905  memcpy(data, change->data.truncate.relids, size);
3906  data += size;
3907 
3908  break;
3909  }
3914  /* ReorderBufferChange contains everything important */
3915  break;
3916  }
3917 
3918  ondisk->size = sz;
3919 
3920  errno = 0;
3921  pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
3922  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
3923  {
3924  int save_errno = errno;
3925 
3927 
3928  /* if write didn't set errno, assume problem is no disk space */
3929  errno = save_errno ? save_errno : ENOSPC;
3930  ereport(ERROR,
3932  errmsg("could not write to data file for XID %u: %m",
3933  txn->xid)));
3934  }
3936 
3937  /*
3938  * Keep the transaction's final_lsn up to date with each change we send to
3939  * disk, so that ReorderBufferRestoreCleanup works correctly. (We used to
3940  * only do this on commit and abort records, but that doesn't work if a
3941  * system crash leaves a transaction without its abort record).
3942  *
3943  * Make sure not to move it backwards.
3944  */
3945  if (txn->final_lsn < change->lsn)
3946  txn->final_lsn = change->lsn;
3947 
3948  Assert(ondisk->change.action == change->action);
3949 }
#define write(a, b, c)
Definition: win32.h:14
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77

References ReorderBufferChange::action, Assert(), ReorderBufferDiskChange::change, CloseTransientFile(), ReorderBufferChange::data, data, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferTXN::final_lsn, if(), ReorderBufferChange::inval, ReorderBufferChange::lsn, ReorderBufferChange::msg, ReorderBuffer::outbuf, pgstat_report_wait_end(), pgstat_report_wait_start(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, size, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, write, SnapshotData::xcnt, ReorderBufferTXN::xid, and SnapshotData::xip.

Referenced by ReorderBufferSerializeTXN().

◆ ReorderBufferSerializedPath()

static void ReorderBufferSerializedPath ( char *  path,
ReplicationSlot slot,
TransactionId  xid,
XLogSegNo  segno 
)
static

Definition at line 4570 of file reorderbuffer.c.

4572 {
4573  XLogRecPtr recptr;
4574 
4575  XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
4576 
4577  snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.spill",
4579  xid, LSN_FORMAT_ARGS(recptr));
4580 }
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43

References ReplicationSlot::data, LSN_FORMAT_ARGS, MAXPGPATH, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, snprintf, wal_segment_size, and XLogSegNoOffsetToRecPtr.

Referenced by ReorderBufferRestoreChanges(), ReorderBufferRestoreCleanup(), and ReorderBufferSerializeTXN().

◆ ReorderBufferSerializeReserve()

static void ReorderBufferSerializeReserve ( ReorderBuffer rb,
Size  sz 
)
static

Definition at line 3456 of file reorderbuffer.c.

3457 {
3458  if (!rb->outbufsize)
3459  {
3460  rb->outbuf = MemoryContextAlloc(rb->context, sz);
3461  rb->outbufsize = sz;
3462  }
3463  else if (rb->outbufsize < sz)
3464  {
3465  rb->outbuf = repalloc(rb->outbuf, sz);
3466  rb->outbufsize = sz;
3467  }
3468 }

References ReorderBuffer::context, MemoryContextAlloc(), ReorderBuffer::outbuf, ReorderBuffer::outbufsize, and repalloc().

Referenced by ReorderBufferRestoreChanges(), and ReorderBufferSerializeChange().

◆ ReorderBufferSerializeTXN()

static void ReorderBufferSerializeTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 3646 of file reorderbuffer.c.

3647 {
3648  dlist_iter subtxn_i;
3649  dlist_mutable_iter change_i;
3650  int fd = -1;
3651  XLogSegNo curOpenSegNo = 0;
3652  Size spilled = 0;
3653  Size size = txn->size;
3654 
3655  elog(DEBUG2, "spill %u changes in XID %u to disk",
3656  (uint32) txn->nentries_mem, txn->xid);
3657 
3658  /* do the same to all child TXs */
3659  dlist_foreach(subtxn_i, &txn->subtxns)
3660  {
3661  ReorderBufferTXN *subtxn;
3662 
3663  subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
3664  ReorderBufferSerializeTXN(rb, subtxn);
3665  }
3666 
3667  /* serialize changestream */
3668  dlist_foreach_modify(change_i, &txn->changes)
3669  {
3670  ReorderBufferChange *change;
3671 
3672  change = dlist_container(ReorderBufferChange, node, change_i.cur);
3673 
3674  /*
3675  * store in segment in which it belongs by start lsn, don't split over
3676  * multiple segments tho
3677  */
3678  if (fd == -1 ||
3679  !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
3680  {
3681  char path[MAXPGPATH];
3682 
3683  if (fd != -1)
3685 
3686  XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
3687 
3688  /*
3689  * No need to care about TLIs here, only used during a single run,
3690  * so each LSN only maps to a specific WAL record.
3691  */
3693  curOpenSegNo);
3694 
3695  /* open segment, create it if necessary */
3696  fd = OpenTransientFile(path,
3697  O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
3698 
3699  if (fd < 0)
3700  ereport(ERROR,
3702  errmsg("could not open file \"%s\": %m", path)));
3703  }
3704 
3705  ReorderBufferSerializeChange(rb, txn, fd, change);
3706  dlist_delete(&change->node);
3707  ReorderBufferReturnChange(rb, change, true);
3708 
3709  spilled++;
3710  }
3711 
3712  /* update the statistics iff we have spilled anything */
3713  if (spilled)
3714  {
3715  rb->spillCount += 1;
3716  rb->spillBytes += size;
3717 
3718  /* don't consider already serialized transactions */
3719  rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
3720 
3721  /* update the decoding stats */
3723  }
3724 
3725  Assert(spilled == txn->nentries_mem);
3726  Assert(dlist_is_empty(&txn->changes));
3727  txn->nentries_mem = 0;
3729 
3730  if (fd != -1)
3732 }
void UpdateDecodingStats(LogicalDecodingContext *ctx)
Definition: logical.c:1924
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
#define rbtxn_is_serialized_clear(txn)
#define RBTXN_IS_SERIALIZED
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)

References Assert(), ReorderBufferTXN::changes, CloseTransientFile(), dlist_iter::cur, dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_delete(), dlist_foreach, dlist_foreach_modify, dlist_is_empty(), elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferChange::lsn, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, OpenTransientFile(), PG_BINARY, ReorderBuffer::private_data, RBTXN_IS_SERIALIZED, rbtxn_is_serialized, rbtxn_is_serialized_clear, ReorderBufferReturnChange(), ReorderBufferSerializeChange(), ReorderBufferSerializedPath(), size, ReorderBufferTXN::size, ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBufferTXN::subtxns, ReorderBufferTXN::txn_flags, UpdateDecodingStats(), wal_segment_size, ReorderBufferTXN::xid, XLByteInSeg, and XLByteToSeg.

Referenced by ReorderBufferCheckMemoryLimit(), and ReorderBufferIterTXNInit().

◆ ReorderBufferSetBaseSnapshot()

void ReorderBufferSetBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 3118 of file reorderbuffer.c.

3120 {
3121  ReorderBufferTXN *txn;
3122  bool is_new;
3123 
3124  Assert(snap != NULL);
3125 
3126  /*
3127  * Fetch the transaction to operate on. If we know it's a subtransaction,
3128  * operate on its top-level transaction instead.
3129  */
3130  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
3131  if (rbtxn_is_known_subxact(txn))
3132  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3133  NULL, InvalidXLogRecPtr, false);
3134  Assert(txn->base_snapshot == NULL);
3135 
3136  txn->base_snapshot = snap;
3137  txn->base_snapshot_lsn = lsn;
3139 
3140  AssertTXNLsnOrder(rb);
3141 }

References Assert(), AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBufferTXN::base_snapshot_node, dlist_push_tail(), InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), ReorderBufferTXN::toplevel_xid, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by SnapBuildCommitTxn(), and SnapBuildProcessChange().

◆ ReorderBufferSetRestartPoint()

void ReorderBufferSetRestartPoint ( ReorderBuffer rb,
XLogRecPtr  ptr 
)

Definition at line 1048 of file reorderbuffer.c.

1049 {
1050  rb->current_restart_decoding_lsn = ptr;
1051 }

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

◆ ReorderBufferSkipPrepare()

void ReorderBufferSkipPrepare ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 2746 of file reorderbuffer.c.

2747 {
2748  ReorderBufferTXN *txn;
2749 
2750  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2751 
2752  /* unknown transaction, nothing to do */
2753  if (txn == NULL)
2754  return;
2755 
2757 }
#define RBTXN_SKIPPED_PREPARE

References InvalidXLogRecPtr, RBTXN_SKIPPED_PREPARE, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by DecodePrepare().

◆ ReorderBufferStreamCommit()

static void ReorderBufferStreamCommit ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1866 of file reorderbuffer.c.

1867 {
1868  /* we should only call this for previously streamed transactions */
1869  Assert(rbtxn_is_streamed(txn));
1870 
1871  ReorderBufferStreamTXN(rb, txn);
1872 
1873  if (rbtxn_prepared(txn))
1874  {
1875  /*
1876  * Note, we send stream prepare even if a concurrent abort is
1877  * detected. See DecodePrepare for more information.
1878  */
1879  rb->stream_prepare(rb, txn, txn->final_lsn);
1880 
1881  /*
1882  * This is a PREPARED transaction, part of a two-phase commit. The
1883  * full cleanup will happen as part of the COMMIT PREPAREDs, so now
1884  * just truncate txn by removing changes and tuplecids.
1885  */
1886  ReorderBufferTruncateTXN(rb, txn, true);
1887  /* Reset the CheckXidAlive */
1889  }
1890  else
1891  {
1892  rb->stream_commit(rb, txn, txn->final_lsn);
1893  ReorderBufferCleanupTXN(rb, txn);
1894  }
1895 }
ReorderBufferStreamPrepareCB stream_prepare
ReorderBufferStreamCommitCB stream_commit

References Assert(), CheckXidAlive, ReorderBufferTXN::final_lsn, InvalidTransactionId, rbtxn_is_streamed, rbtxn_prepared, ReorderBufferCleanupTXN(), ReorderBufferStreamTXN(), ReorderBufferTruncateTXN(), ReorderBuffer::stream_commit, and ReorderBuffer::stream_prepare.

Referenced by ReorderBufferReplay().

◆ ReorderBufferStreamTXN()

static void ReorderBufferStreamTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 3988 of file reorderbuffer.c.

3989 {
3990  Snapshot snapshot_now;
3991  CommandId command_id;
3992  Size stream_bytes;
3993  bool txn_is_streamed;
3994 
3995  /* We can never reach here for a subtransaction. */
3996  Assert(rbtxn_is_toptxn(txn));
3997 
3998  /*
3999  * We can't make any assumptions about base snapshot here, similar to what
4000  * ReorderBufferCommit() does. That relies on base_snapshot getting
4001  * transferred from subxact in ReorderBufferCommitChild(), but that was
4002  * not yet called as the transaction is in-progress.
4003  *
4004  * So just walk the subxacts and use the same logic here. But we only need
4005  * to do that once, when the transaction is streamed for the first time.
4006  * After that we need to reuse the snapshot from the previous run.
4007  *
4008  * Unlike DecodeCommit which adds xids of all the subtransactions in
4009  * snapshot's xip array via SnapBuildCommitTxn, we can't do that here but
4010  * we do add them to subxip array instead via ReorderBufferCopySnap. This
4011  * allows the catalog changes made in subtransactions decoded till now to
4012  * be visible.
4013  */
4014  if (txn->snapshot_now == NULL)
4015  {
4016  dlist_iter subxact_i;
4017 
4018  /* make sure this transaction is streamed for the first time */
4019  Assert(!rbtxn_is_streamed(txn));
4020 
4021  /* at the beginning we should have invalid command ID */
4023 
4024  dlist_foreach(subxact_i, &txn->subtxns)
4025  {
4026  ReorderBufferTXN *subtxn;
4027 
4028  subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
4029  ReorderBufferTransferSnapToParent(txn, subtxn);
4030  }
4031 
4032  /*
4033  * If this transaction has no snapshot, it didn't make any changes to
4034  * the database till now, so there's nothing to decode.
4035  */
4036  if (txn->base_snapshot == NULL)
4037  {
4038  Assert(txn->ninvalidations == 0);
4039  return;
4040  }
4041 
4042  command_id = FirstCommandId;
4043  snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
4044  txn, command_id);
4045  }
4046  else
4047  {
4048  /* the transaction must have been already streamed */
4049  Assert(rbtxn_is_streamed(txn));
4050 
4051  /*
4052  * Nah, we already have snapshot from the previous streaming run. We
4053  * assume new subxacts can't move the LSN backwards, and so can't beat
4054  * the LSN condition in the previous branch (so no need to walk
4055  * through subxacts again). In fact, we must not do that as we may be
4056  * using snapshot half-way through the subxact.
4057  */
4058  command_id = txn->command_id;
4059 
4060  /*
4061  * We can't use txn->snapshot_now directly because after the last
4062  * streaming run, we might have got some new sub-transactions. So we
4063  * need to add them to the snapshot.
4064  */
4065  snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now,
4066  txn, command_id);
4067 
4068  /* Free the previously copied snapshot. */
4069  Assert(txn->snapshot_now->copied);
4071  txn->snapshot_now = NULL;
4072  }
4073 
4074  /*
4075  * Remember this information to be used later to update stats. We can't
4076  * update the stats here as an error while processing the changes would
4077  * lead to the accumulation of stats even though we haven't streamed all
4078  * the changes.
4079  */
4080  txn_is_streamed = rbtxn_is_streamed(txn);
4081  stream_bytes = txn->total_size;
4082 
4083  /* Process and send the changes to output plugin. */
4084  ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
4085  command_id, true);
4086 
4087  rb->streamCount += 1;
4088  rb->streamBytes += stream_bytes;
4089 
4090  /* Don't consider already streamed transaction. */
4091  rb->streamTxns += (txn_is_streamed) ? 0 : 1;
4092 
4093  /* update the decoding stats */
4095 
4096  Assert(dlist_is_empty(&txn->changes));
4097  Assert(txn->nentries == 0);
4098  Assert(txn->nentries_mem == 0);
4099 }

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::changes, ReorderBufferTXN::command_id, SnapshotData::copied, dlist_iter::cur, dlist_container, dlist_foreach, dlist_is_empty(), FirstCommandId, InvalidCommandId, InvalidXLogRecPtr, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferTXN::ninvalidations, ReorderBuffer::private_data, rbtxn_is_streamed, rbtxn_is_toptxn, ReorderBufferCopySnap(), ReorderBufferFreeSnap(), ReorderBufferProcessTXN(), ReorderBufferTransferSnapToParent(), ReorderBufferTXN::snapshot_now, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBufferTXN::subtxns, ReorderBufferTXN::total_size, and UpdateDecodingStats().

Referenced by ReorderBufferCheckMemoryLimit(), ReorderBufferProcessPartialChange(), and ReorderBufferStreamCommit().

◆ ReorderBufferToastAppendChunk()

static void ReorderBufferToastAppendChunk ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 4641 of file reorderbuffer.c.

4643 {
4644  ReorderBufferToastEnt *ent;
4645  HeapTuple newtup;
4646  bool found;
4647  int32 chunksize;
4648  bool isnull;
4649  Pointer chunk;
4650  TupleDesc desc = RelationGetDescr(relation);
4651  Oid chunk_id;
4652  int32 chunk_seq;
4653 
4654  if (txn->toast_hash == NULL)
4655  ReorderBufferToastInitHash(rb, txn);
4656 
4657  Assert(IsToastRelation(relation));
4658 
4659  newtup = change->data.tp.newtuple;
4660  chunk_id = DatumGetObjectId(fastgetattr(newtup, 1, desc, &isnull));
4661  Assert(!isnull);
4662  chunk_seq = DatumGetInt32(fastgetattr(newtup, 2, desc, &isnull));
4663  Assert(!isnull);
4664 
4665  ent = (ReorderBufferToastEnt *)
4666  hash_search(txn->toast_hash, &chunk_id, HASH_ENTER, &found);
4667 
4668  if (!found)
4669  {
4670  Assert(ent->chunk_id == chunk_id);
4671  ent->num_chunks = 0;
4672  ent->last_chunk_seq = 0;
4673  ent->size = 0;
4674  ent->reconstructed = NULL;
4675  dlist_init(&ent->chunks);
4676 
4677  if (chunk_seq != 0)
4678  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
4679  chunk_seq, chunk_id);
4680  }
4681  else if (found && chunk_seq != ent->last_chunk_seq + 1)
4682  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
4683  chunk_seq, chunk_id, ent->last_chunk_seq + 1);
4684 
4685  chunk = DatumGetPointer(fastgetattr(newtup, 3, desc, &isnull));
4686  Assert(!isnull);
4687 
4688  /* calculate size so we can allocate the right size at once later */
4689  if (!VARATT_IS_EXTENDED(chunk))
4690  chunksize = VARSIZE(chunk) - VARHDRSZ;
4691  else if (VARATT_IS_SHORT(chunk))
4692  /* could happen due to heap_form_tuple doing its thing */
4693  chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
4694  else
4695  elog(ERROR, "unexpected type of toast chunk");
4696 
4697  ent->size += chunksize;
4698  ent->last_chunk_seq = chunk_seq;
4699  ent->num_chunks++;
4700  dlist_push_tail(&ent->chunks, &change->node);
4701 }
char * Pointer
Definition: c.h:470
#define VARHDRSZ
Definition: c.h:679
static Datum fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)
Definition: htup_details.h:749
static Oid DatumGetObjectId(Datum X)
Definition: postgres.h:242
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:312
#define RelationGetDescr(relation)
Definition: rel.h:533
static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
struct varlena * reconstructed
#define VARHDRSZ_SHORT
Definition: varatt.h:255
#define VARSIZE_SHORT(PTR)
Definition: varatt.h:281
#define VARATT_IS_EXTENDED(PTR)
Definition: varatt.h:303
#define VARATT_IS_SHORT(PTR)
Definition: varatt.h:302
#define VARSIZE(PTR)
Definition: varatt.h:279

References Assert(), ReorderBufferToastEnt::chunk_id, ReorderBufferToastEnt::chunks, ReorderBufferChange::data, DatumGetInt32(), DatumGetObjectId(), DatumGetPointer(), dlist_init(), dlist_push_tail(), elog, ERROR, fastgetattr(), HASH_ENTER, hash_search(), IsToastRelation(), ReorderBufferToastEnt::last_chunk_seq, ReorderBufferChange::node, ReorderBufferToastEnt::num_chunks, ReorderBufferToastEnt::reconstructed, RelationGetDescr, ReorderBufferToastInitHash(), ReorderBufferToastEnt::size, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, VARATT_IS_EXTENDED, VARATT_IS_SHORT, VARHDRSZ, VARHDRSZ_SHORT, VARSIZE, and VARSIZE_SHORT.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferToastInitHash()

static void ReorderBufferToastInitHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 4621 of file reorderbuffer.c.

4622 {
4623  HASHCTL hash_ctl;
4624 
4625  Assert(txn->toast_hash == NULL);
4626 
4627  hash_ctl.keysize = sizeof(Oid);
4628  hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
4629  hash_ctl.hcxt = rb->context;
4630  txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
4632 }
struct ReorderBufferToastEnt ReorderBufferToastEnt

References Assert(), ReorderBuffer::context, HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, and ReorderBufferTXN::toast_hash.

Referenced by ReorderBufferToastAppendChunk().

◆ ReorderBufferToastReplace()

static void ReorderBufferToastReplace ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 4724 of file reorderbuffer.c.

4726 {
4727  TupleDesc desc;
4728  int natt;
4729  Datum *attrs;
4730  bool *isnull;
4731  bool *free;
4732  HeapTuple tmphtup;
4733  Relation toast_rel;
4734  TupleDesc toast_desc;
4735  MemoryContext oldcontext;
4736  HeapTuple newtup;
4737  Size old_size;
4738 
4739  /* no toast tuples changed */
4740  if (txn->toast_hash == NULL)
4741  return;
4742 
4743  /*
4744  * We're going to modify the size of the change. So, to make sure the
4745  * accounting is correct we record the current change size and then after
4746  * re-computing the change we'll subtract the recorded size and then
4747  * re-add the new change size at the end. We don't immediately subtract
4748  * the old size because if there is any error before we add the new size,
4749  * we will release the changes and that will update the accounting info
4750  * (subtracting the size from the counters). And we don't want to
4751  * underflow there.
4752  */
4753  old_size = ReorderBufferChangeSize(change);
4754 
4755  oldcontext = MemoryContextSwitchTo(rb->context);
4756 
4757  /* we should only have toast tuples in an INSERT or UPDATE */
4758  Assert(change->data.tp.newtuple);
4759 
4760  desc = RelationGetDescr(relation);
4761 
4762  toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
4763  if (!RelationIsValid(toast_rel))
4764  elog(ERROR, "could not open toast relation with OID %u (base relation \"%s\")",
4765  relation->rd_rel->reltoastrelid, RelationGetRelationName(relation));
4766 
4767  toast_desc = RelationGetDescr(toast_rel);
4768 
4769  /* should we allocate from stack instead? */
4770  attrs = palloc0(sizeof(Datum) * desc->natts);
4771  isnull = palloc0(sizeof(bool) * desc->natts);
4772  free = palloc0(sizeof(bool) * desc->natts);
4773 
4774  newtup = change->data.tp.newtuple;
4775 
4776  heap_deform_tuple(newtup, desc, attrs, isnull);
4777 
4778  for (natt = 0; natt < desc->natts; natt++)
4779  {
4780  Form_pg_attribute attr = TupleDescAttr(desc, natt);
4781  ReorderBufferToastEnt *ent;
4782  struct varlena *varlena;
4783 
4784  /* va_rawsize is the size of the original datum -- including header */
4785  struct varatt_external toast_pointer;
4786  struct varatt_indirect redirect_pointer;
4787  struct varlena *new_datum = NULL;
4788  struct varlena *reconstructed;
4789  dlist_iter it;
4790  Size data_done = 0;
4791 
4792  /* system columns aren't toasted */
4793  if (attr->attnum < 0)
4794  continue;
4795 
4796  if (attr->attisdropped)
4797  continue;
4798 
4799  /* not a varlena datatype */
4800  if (attr->attlen != -1)
4801  continue;
4802 
4803  /* no data */
4804  if (isnull[natt])
4805  continue;
4806 
4807  /* ok, we know we have a toast datum */
4808  varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
4809 
4810  /* no need to do anything if the tuple isn't external */
4812  continue;
4813 
4814  VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
4815 
4816  /*
4817  * Check whether the toast tuple changed, replace if so.
4818  */
4819  ent = (ReorderBufferToastEnt *)
4820  hash_search(txn->toast_hash,
4821  &toast_pointer.va_valueid,
4822  HASH_FIND,
4823  NULL);
4824  if (ent == NULL)
4825  continue;
4826 
4827  new_datum =
4829 
4830  free[natt] = true;
4831 
4832  reconstructed = palloc0(toast_pointer.va_rawsize);
4833 
4834  ent->reconstructed = reconstructed;
4835 
4836  /* stitch toast tuple back together from its parts */
4837  dlist_foreach(it, &ent->chunks)
4838  {
4839  bool cisnull;
4840  ReorderBufferChange *cchange;
4841  HeapTuple ctup;
4842  Pointer chunk;
4843 
4844  cchange = dlist_container(ReorderBufferChange, node, it.cur);
4845  ctup = cchange->data.tp.newtuple;
4846  chunk = DatumGetPointer(fastgetattr(ctup, 3, toast_desc, &cisnull));
4847 
4848  Assert(!cisnull);
4849  Assert(!VARATT_IS_EXTERNAL(chunk));
4850  Assert(!VARATT_IS_SHORT(chunk));
4851 
4852  memcpy(VARDATA(reconstructed) + data_done,
4853  VARDATA(chunk),
4854  VARSIZE(chunk) - VARHDRSZ);
4855  data_done += VARSIZE(chunk) - VARHDRSZ;
4856  }
4857  Assert(data_done == VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer));
4858 
4859  /* make sure its marked as compressed or not */
4860  if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
4861  SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
4862  else
4863  SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
4864 
4865  memset(&redirect_pointer, 0, sizeof(redirect_pointer));
4866  redirect_pointer.pointer = reconstructed;
4867 
4869  memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
4870  sizeof(redirect_pointer));
4871 
4872  attrs[natt] = PointerGetDatum(new_datum);
4873  }
4874 
4875  /*
4876  * Build tuple in separate memory & copy tuple back into the tuplebuf
4877  * passed to the output plugin. We can't directly heap_fill_tuple() into
4878  * the tuplebuf because attrs[] will point back into the current content.
4879  */
4880  tmphtup = heap_form_tuple(desc, attrs, isnull);
4881  Assert(newtup->t_len <= MaxHeapTupleSize);
4882  Assert(newtup->t_data == (HeapTupleHeader) ((char *) newtup + HEAPTUPLESIZE));
4883 
4884  memcpy(newtup->t_data, tmphtup->t_data, tmphtup->t_len);
4885  newtup->t_len = tmphtup->t_len;
4886 
4887  /*
4888  * free resources we won't further need, more persistent stuff will be
4889  * free'd in ReorderBufferToastReset().
4890  */
4891  RelationClose(toast_rel);
4892  pfree(tmphtup);
4893  for (natt = 0; natt < desc->natts; natt++)
4894  {
4895  if (free[natt])
4896  pfree(DatumGetPointer(attrs[natt]));
4897  }
4898  pfree(attrs);
4899  pfree(free);
4900  pfree(isnull);
4901 
4902  MemoryContextSwitchTo(oldcontext);
4903 
4904  /* subtract the old change size */
4905  ReorderBufferChangeMemoryUpdate(rb, change, false, old_size);
4906  /* now add the change back, with the correct size */
4907  ReorderBufferChangeMemoryUpdate(rb, change, true,
4908  ReorderBufferChangeSize(change));
4909 }
#define INDIRECT_POINTER_SIZE
Definition: detoast.h:34
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
Definition: detoast.h:22
#define free(a)
Definition: header.h:65
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1116
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:1345
#define MaxHeapTupleSize
Definition: htup_details.h:558
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:209
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
uintptr_t Datum
Definition: postgres.h:64
#define RelationGetRelationName(relation)
Definition: rel.h:541
Definition: c.h:674
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define SET_VARSIZE_COMPRESSED(PTR, len)
Definition: varatt.h:307
#define VARDATA(PTR)
Definition: varatt.h:278
#define SET_VARTAG_EXTERNAL(PTR, tag)
Definition: varatt.h:309
#define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)
Definition: varatt.h:354
#define VARDATA_EXTERNAL(PTR)
Definition: varatt.h:286
#define SET_VARSIZE(PTR, len)
Definition: varatt.h:305
#define VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer)
Definition: varatt.h:334
#define VARATT_IS_EXTERNAL(PTR)
Definition: varatt.h:289
@ VARTAG_INDIRECT
Definition: varatt.h:86

References Assert(), ReorderBufferToastEnt::chunks, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, DatumGetPointer(), dlist_container, dlist_foreach, elog, ERROR, fastgetattr(), free, HASH_FIND, hash_search(), heap_deform_tuple(), heap_form_tuple(), HEAPTUPLESIZE, INDIRECT_POINTER_SIZE, MaxHeapTupleSize, MemoryContextSwitchTo(), TupleDescData::natts, palloc0(), pfree(), varatt_indirect::pointer, PointerGetDatum(), RelationData::rd_rel, ReorderBufferToastEnt::reconstructed, RelationClose(), RelationGetDescr, RelationGetRelationName, RelationIdGetRelation(), RelationIsValid, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), SET_VARSIZE, SET_VARSIZE_COMPRESSED, SET_VARTAG_EXTERNAL, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, TupleDescAttr, varatt_external::va_rawsize, varatt_external::va_valueid, VARATT_EXTERNAL_GET_EXTSIZE, VARATT_EXTERNAL_GET_POINTER, VARATT_EXTERNAL_IS_COMPRESSED, VARATT_IS_EXTERNAL, VARATT_IS_SHORT, VARDATA, VARDATA_EXTERNAL, VARHDRSZ, VARSIZE, and VARTAG_INDIRECT.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferToastReset()

static void ReorderBufferToastReset ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 4915 of file reorderbuffer.c.

4916 {
4917  HASH_SEQ_STATUS hstat;
4918  ReorderBufferToastEnt *ent;
4919 
4920  if (txn->toast_hash == NULL)
4921  return;
4922 
4923  /* sequentially walk over the hash and free everything */
4924  hash_seq_init(&hstat, txn->toast_hash);
4925  while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
4926  {
4927  dlist_mutable_iter it;
4928 
4929  if (ent->reconstructed != NULL)
4930  pfree(ent->reconstructed);
4931 
4932  dlist_foreach_modify(it, &ent->chunks)
4933  {
4934  ReorderBufferChange *change =
4936 
4937  dlist_delete(&change->node);
4938  ReorderBufferReturnChange(rb, change, true);
4939  }
4940  }
4941 
4942  hash_destroy(txn->toast_hash);
4943  txn->toast_hash = NULL;
4944 }

References ReorderBufferToastEnt::chunks, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, hash_destroy(), hash_seq_init(), hash_seq_search(), ReorderBufferChange::node, pfree(), ReorderBufferToastEnt::reconstructed, ReorderBufferReturnChange(), and ReorderBufferTXN::toast_hash.

Referenced by ReorderBufferProcessTXN(), ReorderBufferResetTXN(), and ReorderBufferReturnTXN().

◆ ReorderBufferTransferSnapToParent()

static void ReorderBufferTransferSnapToParent ( ReorderBufferTXN txn,
ReorderBufferTXN subtxn 
)
static

Definition at line 1126 of file reorderbuffer.c.

1128 {
1129  Assert(subtxn->toplevel_xid == txn->xid);
1130 
1131  if (subtxn->base_snapshot != NULL)
1132  {
1133  if (txn->base_snapshot == NULL ||
1134  subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
1135  {
1136  /*
1137  * If the toplevel transaction already has a base snapshot but
1138  * it's newer than the subxact's, purge it.
1139  */
1140  if (txn->base_snapshot != NULL)
1141  {
1144  }
1145 
1146  /*
1147  * The snapshot is now the top transaction's; transfer it, and
1148  * adjust the list position of the top transaction in the list by
1149  * moving it to where the subtransaction is.
1150  */
1151  txn->base_snapshot = subtxn->base_snapshot;
1152  txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
1154  &txn->base_snapshot_node);
1155 
1156  /*
1157  * The subtransaction doesn't have a snapshot anymore (so it
1158  * mustn't be in the list.)
1159  */
1160  subtxn->base_snapshot = NULL;
1162  dlist_delete(&subtxn->base_snapshot_node);
1163  }
1164  else
1165  {
1166  /* Base snap of toplevel is fine, so subxact's is not needed */
1168  dlist_delete(&subtxn->base_snapshot_node);
1169  subtxn->base_snapshot = NULL;
1171  }
1172  }
1173 }
static void dlist_insert_before(dlist_node *before, dlist_node *node)
Definition: ilist.h:393

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBufferTXN::base_snapshot_node, dlist_delete(), dlist_insert_before(), InvalidXLogRecPtr, SnapBuildSnapDecRefcount(), ReorderBufferTXN::toplevel_xid, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAssignChild(), and ReorderBufferStreamTXN().

◆ ReorderBufferTruncateTXN()

static void ReorderBufferTruncateTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
bool  txn_prepared 
)
static

Definition at line 1604 of file reorderbuffer.c.

1605 {
1606  dlist_mutable_iter iter;
1607 
1608  /* cleanup subtransactions & their changes */
1609  dlist_foreach_modify(iter, &txn->subtxns)
1610  {
1611  ReorderBufferTXN *subtxn;
1612 
1613  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1614 
1615  /*
1616  * Subtransactions are always associated to the toplevel TXN, even if
1617  * they originally were happening inside another subtxn, so we won't
1618  * ever recurse more than one level deep here.
1619  */
1620  Assert(rbtxn_is_known_subxact(subtxn));
1621  Assert(subtxn->nsubtxns == 0);
1622 
1623  ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
1624  }
1625 
1626  /* cleanup changes in the txn */
1627  dlist_foreach_modify(iter, &txn->changes)
1628  {
1629  ReorderBufferChange *change;
1630 
1631  change = dlist_container(ReorderBufferChange, node, iter.cur);
1632 
1633  /* Check we're not mixing changes from different transactions. */
1634  Assert(change->txn == txn);
1635 
1636  /* remove the change from it's containing list */
1637  dlist_delete(&change->node);
1638 
1639  ReorderBufferReturnChange(rb, change, true);
1640  }
1641 
1642  /*
1643  * Mark the transaction as streamed.
1644  *
1645  * The top-level transaction, is marked as streamed always, even if it
1646  * does not contain any changes (that is, when all the changes are in
1647  * subtransactions).
1648  *
1649  * For subtransactions, we only mark them as streamed when there are
1650  * changes in them.
1651  *
1652  * We do it this way because of aborts - we don't want to send aborts for
1653  * XIDs the downstream is not aware of. And of course, it always knows
1654  * about the toplevel xact (we send the XID in all messages), but we never
1655  * stream XIDs of empty subxacts.
1656  */
1657  if ((!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0)))
1658  txn->txn_flags |= RBTXN_IS_STREAMED;
1659 
1660  if (txn_prepared)
1661  {
1662  /*
1663  * If this is a prepared txn, cleanup the tuplecids we stored for
1664  * decoding catalog snapshot access. They are always stored in the
1665  * toplevel transaction.
1666  */
1667  dlist_foreach_modify(iter, &txn->tuplecids)
1668  {
1669  ReorderBufferChange *change;
1670 
1671  change = dlist_container(ReorderBufferChange, node, iter.cur);
1672 
1673  /* Check we're not mixing changes from different transactions. */
1674  Assert(change->txn == txn);
1676 
1677  /* Remove the change from its containing list. */
1678  dlist_delete(&change->node);
1679 
1680  ReorderBufferReturnChange(rb, change, true);
1681  }
1682  }
1683 
1684  /*
1685  * Destroy the (relfilelocator, ctid) hashtable, so that we don't leak any
1686  * memory. We could also keep the hash table and update it with new ctid
1687  * values, but this seems simpler and good enough for now.
1688  */
1689  if (txn->tuplecid_hash != NULL)
1690  {
1692  txn->tuplecid_hash = NULL;
1693  }
1694 
1695  /* If this txn is serialized then clean the disk space. */
1696  if (rbtxn_is_serialized(txn))
1697  {
1698  ReorderBufferRestoreCleanup(rb, txn);
1699  txn->txn_flags &= ~RBTXN_IS_SERIALIZED;
1700 
1701  /*
1702  * We set this flag to indicate if the transaction is ever serialized.
1703  * We need this to accurately update the stats as otherwise the same
1704  * transaction can be counted as serialized multiple times.
1705  */
1707  }
1708 
1709  /* also reset the number of entries in the transaction */
1710  txn->nentries_mem = 0;
1711  txn->nentries = 0;
1712 }
#define RBTXN_IS_STREAMED
#define RBTXN_IS_SERIALIZED_CLEAR

References ReorderBufferChange::action, Assert(), ReorderBufferTXN::changes, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, hash_destroy(), ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SERIALIZED, rbtxn_is_serialized, RBTXN_IS_SERIALIZED_CLEAR, RBTXN_IS_STREAMED, rbtxn_is_toptxn, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferRestoreCleanup(), ReorderBufferReturnChange(), ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecid_hash, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXN::txn_flags.

Referenced by ReorderBufferProcessTXN(), ReorderBufferResetTXN(), and ReorderBufferStreamCommit().

◆ ReorderBufferTXNByXid()

static ReorderBufferTXN * ReorderBufferTXNByXid ( ReorderBuffer rb,
TransactionId  xid,
bool  create,
bool is_new,
XLogRecPtr  lsn,
bool  create_as_top 
)
static

Definition at line 614 of file reorderbuffer.c.

616 {
617  ReorderBufferTXN *txn;
619  bool found;
620 
622 
623  /*
624  * Check the one-entry lookup cache first
625  */
627  rb->by_txn_last_xid == xid)
628  {
629  txn = rb->by_txn_last_txn;
630 
631  if (txn != NULL)
632  {
633  /* found it, and it's valid */
634  if (is_new)
635  *is_new = false;
636  return txn;
637  }
638 
639  /*
640  * cached as non-existent, and asked not to create? Then nothing else
641  * to do.
642  */
643  if (!create)
644  return NULL;
645  /* otherwise fall through to create it */
646  }
647 
648  /*
649  * If the cache wasn't hit or it yielded a "does-not-exist" and we want to
650  * create an entry.
651  */
652 
653  /* search the lookup table */
654  ent = (ReorderBufferTXNByIdEnt *)
655  hash_search(rb->by_txn,
656  &xid,
657  create ? HASH_ENTER : HASH_FIND,
658  &found);
659  if (found)
660  txn = ent->txn;
661  else if (create)
662  {
663  /* initialize the new entry, if creation was requested */
664  Assert(ent != NULL);
665  Assert(lsn != InvalidXLogRecPtr);
666 
667  ent->txn = ReorderBufferGetTXN(rb);
668  ent->txn->xid = xid;
669  txn = ent->txn;
670  txn->first_lsn = lsn;
672 
673  if (create_as_top)
674  {
675  dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
676  AssertTXNLsnOrder(rb);
677  }
678  }
679  else
680  txn = NULL; /* not found and not asked to create */
681 
682  /* update cache */
683  rb->by_txn_last_xid = xid;
684  rb->by_txn_last_txn = txn;
685 
686  if (is_new)
687  *is_new = !found;
688 
689  Assert(!create || txn != NULL);
690  return txn;
691 }
static ReorderBufferTXN * ReorderBufferGetTXN(ReorderBuffer *rb)
XLogRecPtr restart_decoding_lsn
#define TransactionIdIsValid(xid)
Definition: transam.h:41

References Assert(), AssertTXNLsnOrder(), ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::current_restart_decoding_lsn, dlist_push_tail(), ReorderBufferTXN::first_lsn, HASH_ENTER, HASH_FIND, hash_search(), InvalidXLogRecPtr, ReorderBufferTXN::node, ReorderBufferGetTXN(), ReorderBufferTXN::restart_decoding_lsn, ReorderBuffer::toplevel_by_lsn, TransactionIdIsValid, ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAddInvalidations(), ReorderBufferAddNewTupleCids(), ReorderBufferAssignChild(), ReorderBufferCommit(), ReorderBufferCommitChild(), ReorderBufferFinishPrepared(), ReorderBufferForget(), ReorderBufferInvalidate(), ReorderBufferPrepare(), ReorderBufferProcessXid(), ReorderBufferQueueChange(), ReorderBufferQueueMessage(), ReorderBufferRememberPrepareInfo(), ReorderBufferSetBaseSnapshot(), ReorderBufferSkipPrepare(), ReorderBufferXidHasBaseSnapshot(), ReorderBufferXidHasCatalogChanges(), and ReorderBufferXidSetCatalogChanges().

◆ ReorderBufferXidHasBaseSnapshot()

bool ReorderBufferXidHasBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 3426 of file reorderbuffer.c.

3427 {
3428  ReorderBufferTXN *txn;
3429 
3430  txn = ReorderBufferTXNByXid(rb, xid, false,
3431  NULL, InvalidXLogRecPtr, false);
3432 
3433  /* transaction isn't known yet, ergo no snapshot */
3434  if (txn == NULL)
3435  return false;
3436 
3437  /* a known subtxn? operate on top-level txn instead */
3438  if (rbtxn_is_known_subxact(txn))
3439  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3440  NULL, InvalidXLogRecPtr, false);
3441 
3442  return txn->base_snapshot != NULL;
3443 }

References ReorderBufferTXN::base_snapshot, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), and ReorderBufferTXN::toplevel_xid.

Referenced by SnapBuildCommitTxn(), SnapBuildDistributeNewCatalogSnapshot(), and SnapBuildProcessChange().

◆ ReorderBufferXidHasCatalogChanges()

bool ReorderBufferXidHasCatalogChanges ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 3409 of file reorderbuffer.c.

3410 {
3411  ReorderBufferTXN *txn;
3412 
3413  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3414  false);
3415  if (txn == NULL)
3416  return false;
3417 
3418  return rbtxn_has_catalog_changes(txn);
3419 }

References InvalidXLogRecPtr, rbtxn_has_catalog_changes, and ReorderBufferTXNByXid().

Referenced by SnapBuildXidHasCatalogChanges().

◆ ReorderBufferXidSetCatalogChanges()

void ReorderBufferXidSetCatalogChanges ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3336 of file reorderbuffer.c.

3338 {
3339  ReorderBufferTXN *txn;
3340 
3341  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3342 
3343  if (!rbtxn_has_catalog_changes(txn))
3344  {
3347  }
3348 
3349  /*
3350  * Mark top-level transaction as having catalog changes too if one of its
3351  * children has so that the ReorderBufferBuildTupleCidHash can
3352  * conveniently check just top-level transaction and decide whether to
3353  * build the hash table or not.
3354  */
3355  if (rbtxn_is_subtxn(txn))
3356  {
3357  ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
3358 
3359  if (!rbtxn_has_catalog_changes(toptxn))
3360  {
3363  }
3364  }
3365 }
static void dclist_push_tail(dclist_head *head, dlist_node *node)
Definition: ilist.h:709
#define rbtxn_is_subtxn(txn)
#define RBTXN_HAS_CATALOG_CHANGES

References ReorderBufferTXN::catchange_node, ReorderBuffer::catchange_txns, dclist_push_tail(), rbtxn_get_toptxn, RBTXN_HAS_CATALOG_CHANGES, rbtxn_has_catalog_changes, rbtxn_is_subtxn, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by heap_decode(), SnapBuildProcessNewCid(), and xact_decode().

◆ ResolveCminCmaxDuringDecoding()

bool ResolveCminCmaxDuringDecoding ( HTAB tuplecid_data,
Snapshot  snapshot,
HeapTuple  htup,
Buffer  buffer,
CommandId cmin,
CommandId cmax 
)

Definition at line 5207 of file reorderbuffer.c.

5211 {
5214  ForkNumber forkno;
5215  BlockNumber blockno;
5216  bool updated_mapping = false;
5217 
5218  /*
5219  * Return unresolved if tuplecid_data is not valid. That's because when
5220  * streaming in-progress transactions we may run into tuples with the CID
5221  * before actually decoding them. Think e.g. about INSERT followed by
5222  * TRUNCATE, where the TRUNCATE may not be decoded yet when applying the
5223  * INSERT. So in such cases, we assume the CID is from the future
5224  * command.
5225  */
5226  if (tuplecid_data == NULL)
5227  return false;
5228 
5229  /* be careful about padding */
5230  memset(&key, 0, sizeof(key));
5231 
5232  Assert(!BufferIsLocal(buffer));
5233 
5234  /*
5235  * get relfilelocator from the buffer, no convenient way to access it
5236  * other than that.
5237  */
5238  BufferGetTag(buffer, &key.rlocator, &forkno, &blockno);
5239 
5240  /* tuples can only be in the main fork */
5241  Assert(forkno == MAIN_FORKNUM);
5242  Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
5243 
5244  ItemPointerCopy(&htup->t_self,
5245  &key.tid);
5246 
5247 restart:
5248  ent = (ReorderBufferTupleCidEnt *)
5250 
5251  /*
5252  * failed to find a mapping, check whether the table was rewritten and
5253  * apply mapping if so, but only do that once - there can be no new
5254  * mappings while we are in here since we have to hold a lock on the
5255  * relation.
5256  */
5257  if (ent == NULL && !updated_mapping)
5258  {
5260  /* now check but don't update for a mapping again */
5261  updated_mapping = true;
5262  goto restart;
5263  }
5264  else if (ent == NULL)
5265  return false;
5266 
5267  if (cmin)
5268  *cmin = ent->cmin;
5269  if (cmax)
5270  *cmax = ent->cmax;
5271  return true;
5272 }
uint32 BlockNumber
Definition: block.h:31
#define BufferIsLocal(buffer)
Definition: buf.h:37
void BufferGetTag(Buffer buffer, RelFileLocator *rlocator, ForkNumber *forknum, BlockNumber *blknum)
Definition: bufmgr.c:3398
static BlockNumber ItemPointerGetBlockNumber(const ItemPointerData *pointer)
Definition: itemptr.h:103
ForkNumber
Definition: relpath.h:48
static void UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
ItemPointerData t_self
Definition: htup.h:65
Oid t_tableOid
Definition: htup.h:66

References Assert(), BufferGetTag(), BufferIsLocal, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, HASH_FIND, hash_search(), ItemPointerCopy(), ItemPointerGetBlockNumber(), sort-test::key, MAIN_FORKNUM, HeapTupleData::t_self, HeapTupleData::t_tableOid, tuplecid_data, and UpdateLogicalMappings().

Referenced by HeapTupleSatisfiesHistoricMVCC().

◆ SetupCheckXidLive()

static void SetupCheckXidLive ( TransactionId  xid)
inlinestatic

Definition at line 1930 of file reorderbuffer.c.

1931 {
1932  /*
1933  * If the input transaction id is already set as a CheckXidAlive then
1934  * nothing to do.
1935  */
1937  return;
1938 
1939  /*
1940  * setup CheckXidAlive if it's not committed yet. We don't check if the
1941  * xid is aborted. That will happen during catalog access.
1942  */
1943  if (!TransactionIdDidCommit(xid))
1944  CheckXidAlive = xid;
1945  else
1947 }
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:126
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43

References CheckXidAlive, InvalidTransactionId, TransactionIdDidCommit(), and TransactionIdEquals.

Referenced by ReorderBufferProcessTXN().

◆ StartupReorderBuffer()

void StartupReorderBuffer ( void  )

Definition at line 4587 of file reorderbuffer.c.

4588 {
4589  DIR *logical_dir;
4590  struct dirent *logical_de;
4591 
4592  logical_dir = AllocateDir("pg_replslot");
4593  while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
4594  {
4595  if (strcmp(logical_de->d_name, ".") == 0 ||
4596  strcmp(logical_de->d_name, "..") == 0)
4597  continue;
4598 
4599  /* if it cannot be a slot, skip the directory */
4600  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
4601  continue;
4602 
4603  /*
4604  * ok, has to be a surviving logical slot, iterate and delete
4605  * everything starting with xid-*
4606  */
4608  }
4609  FreeDir(logical_dir);
4610 }
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2909
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:252

References AllocateDir(), dirent::d_name, DEBUG2, FreeDir(), ReadDir(), ReorderBufferCleanupSerializedTXNs(), and ReplicationSlotValidateName().

Referenced by StartupXLOG().

◆ TransactionIdInArray()

static bool TransactionIdInArray ( TransactionId  xid,
TransactionId xip,
Size  num 
)
static

Definition at line 5106 of file reorderbuffer.c.

5107 {
5108  return bsearch(&xid, xip, num,
5109  sizeof(TransactionId), xidComparator) != NULL;
5110 }

References xidComparator().

Referenced by UpdateLogicalMappings().

◆ UpdateLogicalMappings()

static void UpdateLogicalMappings ( HTAB tuplecid_data,
Oid  relid,
Snapshot  snapshot 
)
static

Definition at line 5129 of file reorderbuffer.c.

5130 {
5131  DIR *mapping_dir;
5132  struct dirent *mapping_de;
5133  List *files = NIL;
5134  ListCell *file;
5135  Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
5136 
5137  mapping_dir = AllocateDir("pg_logical/mappings");
5138  while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
5139  {
5140  Oid f_dboid;
5141  Oid f_relid;
5142  TransactionId f_mapped_xid;
5143  TransactionId f_create_xid;
5144  XLogRecPtr f_lsn;
5145  uint32 f_hi,
5146  f_lo;
5147  RewriteMappingFile *f;
5148 
5149  if (strcmp(mapping_de->d_name, ".") == 0 ||
5150  strcmp(mapping_de->d_name, "..") == 0)
5151  continue;
5152 
5153  /* Ignore files that aren't ours */
5154  if (strncmp(mapping_de->d_name, "map-", 4) != 0)
5155  continue;
5156 
5157  if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
5158  &f_dboid, &f_relid, &f_hi, &f_lo,
5159  &f_mapped_xid, &f_create_xid) != 6)
5160  elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
5161 
5162  f_lsn = ((uint64) f_hi) << 32 | f_lo;
5163 
5164  /* mapping for another database */
5165  if (f_dboid != dboid)
5166  continue;
5167 
5168  /* mapping for another relation */
5169  if (f_relid != relid)
5170  continue;
5171 
5172  /* did the creating transaction abort? */
5173  if (!TransactionIdDidCommit(f_create_xid))
5174  continue;
5175 
5176  /* not for our transaction */
5177  if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
5178  continue;
5179 
5180  /* ok, relevant, queue for apply */
5181  f = palloc(sizeof(RewriteMappingFile));
5182  f->lsn = f_lsn;
5183  strcpy(f->fname, mapping_de->d_name);
5184  files = lappend(files, f);
5185  }
5186  FreeDir(mapping_dir);
5187 
5188  /* sort files so we apply them in LSN order */
5189  list_sort(files, file_sort_by_lsn);
5190 
5191  foreach(file, files)
5192  {
5194 
5195  elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
5196  snapshot->subxip[0]);
5198  pfree(f);
5199  }
5200 }
bool IsSharedRelation(Oid relationId)
Definition: catalog.c:243
#define DEBUG1
Definition: elog.h:30
Oid MyDatabaseId
Definition: globals.c:91
void list_sort(List *list, list_sort_comparator cmp)
Definition: list.c:1674
List * lappend(List *list, void *datum)
Definition: list.c:339
#define NIL
Definition: pg_list.h:68
static int file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
static void ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
static bool TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
Definition: pg_list.h:54
char fname[MAXPGPATH]

References AllocateDir(), ApplyLogicalMappingFile(), dirent::d_name, DEBUG1, elog, ERROR, file_sort_by_lsn(), RewriteMappingFile::fname, FreeDir(), InvalidOid, IsSharedRelation(), lappend(), lfirst, list_sort(), LOGICAL_REWRITE_FORMAT, RewriteMappingFile::lsn, MyDatabaseId, NIL, palloc(), pfree(), ReadDir(), SnapshotData::subxcnt, SnapshotData::subxip, TransactionIdDidCommit(), TransactionIdInArray(), and tuplecid_data.

Referenced by ResolveCminCmaxDuringDecoding().

Variable Documentation

◆ debug_logical_replication_streaming

int debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING_BUFFERED

Definition at line 212 of file reorderbuffer.c.

Referenced by pa_send_data(), and ReorderBufferCheckMemoryLimit().

◆ logical_decoding_work_mem

int logical_decoding_work_mem

Definition at line 208 of file reorderbuffer.c.

Referenced by ReorderBufferCheckMemoryLimit().

◆ max_changes_in_memory

const Size max_changes_in_memory = 4096
static

Definition at line 209 of file reorderbuffer.c.

Referenced by ReorderBufferRestoreChanges().