PostgreSQL Source Code  git master
reorderbuffer.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/detoast.h"
#include "access/heapam.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/combocid.h"
#include "utils/memdebug.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenodemap.h"
Include dependency graph for reorderbuffer.c:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTXNByIdEnt
 
struct  ReorderBufferTupleCidKey
 
struct  ReorderBufferTupleCidEnt
 
struct  TXNEntryFile
 
struct  ReorderBufferIterTXNEntry
 
struct  ReorderBufferIterTXNState
 
struct  ReorderBufferToastEnt
 
struct  ReorderBufferDiskChange
 
struct  RewriteMappingFile
 

Macros

#define IsSpecInsert(action)
 
#define IsSpecConfirmOrAbort(action)
 
#define IsInsertOrUpdate(action)
 

Typedefs

typedef struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
 
typedef struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
 
typedef struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
 
typedef struct TXNEntryFile TXNEntryFile
 
typedef struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
 
typedef struct ReorderBufferIterTXNState ReorderBufferIterTXNState
 
typedef struct ReorderBufferToastEnt ReorderBufferToastEnt
 
typedef struct ReorderBufferDiskChange ReorderBufferDiskChange
 
typedef struct RewriteMappingFile RewriteMappingFile
 

Functions

static ReorderBufferTXNReorderBufferGetTXN (ReorderBuffer *rb)
 
static void ReorderBufferReturnTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferTXNReorderBufferTXNByXid (ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
 
static void ReorderBufferTransferSnapToParent (ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
 
static void AssertTXNLsnOrder (ReorderBuffer *rb)
 
static void ReorderBufferIterTXNInit (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
 
static ReorderBufferChangeReorderBufferIterTXNNext (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferIterTXNFinish (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferExecuteInvalidations (uint32 nmsgs, SharedInvalidationMessage *msgs)
 
static void ReorderBufferCheckMemoryLimit (ReorderBuffer *rb)
 
static void ReorderBufferSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeChange (ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
 
static Size ReorderBufferRestoreChanges (ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
 
static void ReorderBufferRestoreChange (ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
 
static void ReorderBufferRestoreCleanup (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferTruncateTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
 
static void ReorderBufferCleanupSerializedTXNs (const char *slotname)
 
static void ReorderBufferSerializedPath (char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
 
static void ReorderBufferFreeSnap (ReorderBuffer *rb, Snapshot snap)
 
static Snapshot ReorderBufferCopySnap (ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
 
static bool ReorderBufferCanStream (ReorderBuffer *rb)
 
static bool ReorderBufferCanStartStreaming (ReorderBuffer *rb)
 
static void ReorderBufferStreamTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferStreamCommit (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastInitHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReset (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReplace (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void ReorderBufferToastAppendChunk (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static Size ReorderBufferChangeSize (ReorderBufferChange *change)
 
static void ReorderBufferChangeMemoryUpdate (ReorderBuffer *rb, ReorderBufferChange *change, bool addition, Size sz)
 
ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *rb)
 
void ReorderBufferReturnChange (ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
 
ReorderBufferTupleBufReorderBufferGetTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
 
OidReorderBufferGetRelids (ReorderBuffer *rb, int nrelids)
 
void ReorderBufferReturnRelids (ReorderBuffer *rb, Oid *relids)
 
static void ReorderBufferProcessPartialChange (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
static void AssertChangeLsnOrder (ReorderBufferTXN *txn)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
TransactionId ReorderBufferGetOldestXmin (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
static int ReorderBufferIterCompare (Datum a, Datum b, void *arg)
 
static void ReorderBufferCleanupTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferBuildTupleCidHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void SetupCheckXidLive (TransactionId xid)
 
static void ReorderBufferApplyChange (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyTruncate (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyMessage (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferSaveTXNSnapshot (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
 
static void ReorderBufferResetTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
 
static void ReorderBufferProcessTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)
 
static void ReorderBufferReplay (ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
bool ReorderBufferRememberPrepareInfo (ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferSkipPrepare (ReorderBuffer *rb, TransactionId xid)
 
void ReorderBufferPrepare (ReorderBuffer *rb, TransactionId xid, char *gid)
 
void ReorderBufferFinishPrepared (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferInvalidate (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
static void ReorderBufferSerializeReserve (ReorderBuffer *rb, Size sz)
 
static ReorderBufferTXNReorderBufferLargestTXN (ReorderBuffer *rb)
 
static ReorderBufferTXNReorderBufferLargestTopTXN (ReorderBuffer *rb)
 
void StartupReorderBuffer (void)
 
static void ApplyLogicalMappingFile (HTAB *tuplecid_data, Oid relid, const char *fname)
 
static bool TransactionIdInArray (TransactionId xid, TransactionId *xip, Size num)
 
static int file_sort_by_lsn (const ListCell *a_p, const ListCell *b_p)
 
static void UpdateLogicalMappings (HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
 
bool ResolveCminCmaxDuringDecoding (HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
 

Variables

int logical_decoding_work_mem
 
static const Size max_changes_in_memory = 4096
 

Macro Definition Documentation

◆ IsInsertOrUpdate

#define IsInsertOrUpdate (   action)
Value:

Definition at line 190 of file reorderbuffer.c.

◆ IsSpecConfirmOrAbort

#define IsSpecConfirmOrAbort (   action)
Value:
( \
)
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
Definition: reorderbuffer.h:65
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT
Definition: reorderbuffer.h:66

Definition at line 185 of file reorderbuffer.c.

◆ IsSpecInsert

#define IsSpecInsert (   action)
Value:

Definition at line 181 of file reorderbuffer.c.

Typedef Documentation

◆ ReorderBufferDiskChange

◆ ReorderBufferIterTXNEntry

◆ ReorderBufferIterTXNState

◆ ReorderBufferToastEnt

◆ ReorderBufferTupleCidEnt

◆ ReorderBufferTupleCidKey

◆ ReorderBufferTXNByIdEnt

◆ RewriteMappingFile

◆ TXNEntryFile

typedef struct TXNEntryFile TXNEntryFile

Function Documentation

◆ ApplyLogicalMappingFile()

static void ApplyLogicalMappingFile ( HTAB tuplecid_data,
Oid  relid,
const char *  fname 
)
static

Definition at line 4892 of file reorderbuffer.c.

4893 {
4894  char path[MAXPGPATH];
4895  int fd;
4896  int readBytes;
4898 
4899  sprintf(path, "pg_logical/mappings/%s", fname);
4900  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
4901  if (fd < 0)
4902  ereport(ERROR,
4904  errmsg("could not open file \"%s\": %m", path)));
4905 
4906  while (true)
4907  {
4910  ReorderBufferTupleCidEnt *new_ent;
4911  bool found;
4912 
4913  /* be careful about padding */
4914  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
4915 
4916  /* read all mappings till the end of the file */
4918  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
4920 
4921  if (readBytes < 0)
4922  ereport(ERROR,
4924  errmsg("could not read file \"%s\": %m",
4925  path)));
4926  else if (readBytes == 0) /* EOF */
4927  break;
4928  else if (readBytes != sizeof(LogicalRewriteMappingData))
4929  ereport(ERROR,
4931  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
4932  path, readBytes,
4933  (int32) sizeof(LogicalRewriteMappingData))));
4934 
4935  key.relnode = map.old_node;
4936  ItemPointerCopy(&map.old_tid,
4937  &key.tid);
4938 
4939 
4940  ent = (ReorderBufferTupleCidEnt *)
4942  (void *) &key,
4943  HASH_FIND,
4944  NULL);
4945 
4946  /* no existing mapping, no need to update */
4947  if (!ent)
4948  continue;
4949 
4950  key.relnode = map.new_node;
4951  ItemPointerCopy(&map.new_tid,
4952  &key.tid);
4953 
4954  new_ent = (ReorderBufferTupleCidEnt *)
4956  (void *) &key,
4957  HASH_ENTER,
4958  &found);
4959 
4960  if (found)
4961  {
4962  /*
4963  * Make sure the existing mapping makes sense. We sometime update
4964  * old records that did not yet have a cmax (e.g. pg_class' own
4965  * entry while rewriting it) during rewrites, so allow that.
4966  */
4967  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
4968  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
4969  }
4970  else
4971  {
4972  /* update mapping */
4973  new_ent->cmin = ent->cmin;
4974  new_ent->cmax = ent->cmax;
4975  new_ent->combocid = ent->combocid;
4976  }
4977  }
4978 
4979  if (CloseTransientFile(fd) != 0)
4980  ereport(ERROR,
4982  errmsg("could not close file \"%s\": %m", path)));
4983 }
#define InvalidCommandId
Definition: c.h:604
signed int int32
Definition: c.h:429
#define PG_BINARY
Definition: c.h:1268
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
int errcode_for_file_access(void)
Definition: elog.c:716
int errmsg(const char *fmt,...)
Definition: elog.c:904
#define ERROR
Definition: elog.h:33
#define ereport(elevel,...)
Definition: elog.h:143
int CloseTransientFile(int fd)
Definition: fd.c:2688
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2511
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_ENTER
Definition: hsearch.h:114
#define read(a, b, c)
Definition: win32.h:13
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:161
Assert(fmt[strlen(fmt) - 1] !='\n')
#define MAXPGPATH
#define sprintf
Definition: port.h:227
static int fd(const char *x, int i)
Definition: preproc-init.c:105
static HTAB * tuplecid_data
Definition: snapmgr.c:116
ItemPointerData new_tid
Definition: rewriteheap.h:40
ItemPointerData old_tid
Definition: rewriteheap.h:39
@ WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ
Definition: wait_event.h:200
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:266
static void pgstat_report_wait_end(void)
Definition: wait_event.h:282

References Assert(), CloseTransientFile(), ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), HASH_ENTER, HASH_FIND, hash_search(), InvalidCommandId, ItemPointerCopy, sort-test::key, MAXPGPATH, LogicalRewriteMappingData::new_node, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_node, LogicalRewriteMappingData::old_tid, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, sprintf, tuplecid_data, and WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ.

Referenced by UpdateLogicalMappings().

◆ AssertChangeLsnOrder()

static void AssertChangeLsnOrder ( ReorderBufferTXN txn)
static

Definition at line 935 of file reorderbuffer.c.

936 {
937 #ifdef USE_ASSERT_CHECKING
938  dlist_iter iter;
939  XLogRecPtr prev_lsn = txn->first_lsn;
940 
941  dlist_foreach(iter, &txn->changes)
942  {
943  ReorderBufferChange *cur_change;
944 
945  cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
946 
948  Assert(cur_change->lsn != InvalidXLogRecPtr);
949  Assert(txn->first_lsn <= cur_change->lsn);
950 
951  if (txn->end_lsn != InvalidXLogRecPtr)
952  Assert(cur_change->lsn <= txn->end_lsn);
953 
954  Assert(prev_lsn <= cur_change->lsn);
955 
956  prev_lsn = cur_change->lsn;
957  }
958 #endif
959 }
#define dlist_foreach(iter, lhead)
Definition: ilist.h:526
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
XLogRecPtr first_lsn
XLogRecPtr end_lsn
dlist_head changes
dlist_node * cur
Definition: ilist.h:161
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert(), ReorderBufferTXN::changes, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, and ReorderBufferChange::lsn.

Referenced by ReorderBufferIterTXNInit().

◆ AssertTXNLsnOrder()

static void AssertTXNLsnOrder ( ReorderBuffer rb)
static

Definition at line 878 of file reorderbuffer.c.

879 {
880 #ifdef USE_ASSERT_CHECKING
881  dlist_iter iter;
882  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
883  XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
884 
885  dlist_foreach(iter, &rb->toplevel_by_lsn)
886  {
888  iter.cur);
889 
890  /* start LSN must be set */
891  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
892 
893  /* If there is an end LSN, it must be higher than start LSN */
894  if (cur_txn->end_lsn != InvalidXLogRecPtr)
895  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
896 
897  /* Current initial LSN must be strictly higher than previous */
898  if (prev_first_lsn != InvalidXLogRecPtr)
899  Assert(prev_first_lsn < cur_txn->first_lsn);
900 
901  /* known-as-subtxn txns must not be listed */
902  Assert(!rbtxn_is_known_subxact(cur_txn));
903 
904  prev_first_lsn = cur_txn->first_lsn;
905  }
906 
908  {
910  base_snapshot_node,
911  iter.cur);
912 
913  /* base snapshot (and its LSN) must be set */
914  Assert(cur_txn->base_snapshot != NULL);
916 
917  /* current LSN must be strictly higher than previous */
918  if (prev_base_snap_lsn != InvalidXLogRecPtr)
919  Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
920 
921  /* known-as-subtxn txns must not be listed */
922  Assert(!rbtxn_is_known_subxact(cur_txn));
923 
924  prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
925  }
926 #endif
927 }
#define rbtxn_is_known_subxact(txn)
XLogRecPtr base_snapshot_lsn
Snapshot base_snapshot
dlist_head txns_by_base_snapshot_lsn
dlist_head toplevel_by_lsn

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBuffer::toplevel_by_lsn, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferAssignChild(), ReorderBufferGetOldestTXN(), ReorderBufferGetOldestXmin(), ReorderBufferSetBaseSnapshot(), and ReorderBufferTXNByXid().

◆ file_sort_by_lsn()

static int file_sort_by_lsn ( const ListCell a_p,
const ListCell b_p 
)
static

Definition at line 5000 of file reorderbuffer.c.

5001 {
5004 
5005  if (a->lsn < b->lsn)
5006  return -1;
5007  else if (a->lsn > b->lsn)
5008  return 1;
5009  return 0;
5010 }
int b
Definition: isn.c:70
int a
Definition: isn.c:69
#define lfirst(lc)
Definition: pg_list.h:169

References a, b, and lfirst.

Referenced by UpdateLogicalMappings().

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2829 of file reorderbuffer.c.

2830 {
2831  ReorderBufferTXN *txn;
2832 
2833  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2834  false);
2835 
2836  /* unknown, nothing to remove */
2837  if (txn == NULL)
2838  return;
2839 
2840  /* For streamed transactions notify the remote node about the abort. */
2841  if (rbtxn_is_streamed(txn))
2842  {
2843  rb->stream_abort(rb, txn, lsn);
2844 
2845  /*
2846  * We might have decoded changes for this transaction that could load
2847  * the cache as per the current transaction's view (consider DDL's
2848  * happened in this transaction). We don't want the decoding of future
2849  * transactions to use those cache entries so execute invalidations.
2850  */
2851  if (txn->ninvalidations > 0)
2853  txn->invalidations);
2854  }
2855 
2856  /* cosmetic... */
2857  txn->final_lsn = lsn;
2858 
2859  /* remove potential on-disk data, and deallocate */
2860  ReorderBufferCleanupTXN(rb, txn);
2861 }
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_streamed(txn)
SharedInvalidationMessage * invalidations
XLogRecPtr final_lsn
ReorderBufferStreamAbortCB stream_abort

References ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), and ReorderBuffer::stream_abort.

Referenced by DecodeAbort().

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)

Definition at line 2871 of file reorderbuffer.c.

2872 {
2873  dlist_mutable_iter it;
2874 
2875  /*
2876  * Iterate through all (potential) toplevel TXNs and abort all that are
2877  * older than what possibly can be running. Once we've found the first
2878  * that is alive we stop, there might be some that acquired an xid earlier
2879  * but started writing later, but it's unlikely and they will be cleaned
2880  * up in a later call to this function.
2881  */
2883  {
2884  ReorderBufferTXN *txn;
2885 
2886  txn = dlist_container(ReorderBufferTXN, node, it.cur);
2887 
2888  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2889  {
2890  elog(DEBUG2, "aborting old transaction %u", txn->xid);
2891 
2892  /* remove potential on-disk data, and deallocate this tx */
2893  ReorderBufferCleanupTXN(rb, txn);
2894  }
2895  else
2896  return;
2897  }
2898 }
#define DEBUG2
Definition: elog.h:23
#define elog(elevel,...)
Definition: elog.h:218
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:543
TransactionId xid
dlist_node * cur
Definition: ilist.h:180
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300

References dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, elog, ReorderBufferCleanupTXN(), ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), and ReorderBufferTXN::xid.

Referenced by standby_decode().

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 3203 of file reorderbuffer.c.

3206 {
3207  ReorderBufferTXN *txn;
3208  MemoryContext oldcontext;
3209  ReorderBufferChange *change;
3210 
3211  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3212 
3213  oldcontext = MemoryContextSwitchTo(rb->context);
3214 
3215  /*
3216  * Collect all the invalidations under the top transaction so that we can
3217  * execute them all together. See comment atop this function
3218  */
3219  if (txn->toptxn)
3220  txn = txn->toptxn;
3221 
3222  Assert(nmsgs > 0);
3223 
3224  /* Accumulate invalidations. */
3225  if (txn->ninvalidations == 0)
3226  {
3227  txn->ninvalidations = nmsgs;
3229  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3230  memcpy(txn->invalidations, msgs,
3231  sizeof(SharedInvalidationMessage) * nmsgs);
3232  }
3233  else
3234  {
3237  (txn->ninvalidations + nmsgs));
3238 
3239  memcpy(txn->invalidations + txn->ninvalidations, msgs,
3240  nmsgs * sizeof(SharedInvalidationMessage));
3241  txn->ninvalidations += nmsgs;
3242  }
3243 
3244  change = ReorderBufferGetChange(rb);
3246  change->data.inval.ninvalidations = nmsgs;
3247  change->data.inval.invalidations = (SharedInvalidationMessage *)
3248  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3249  memcpy(change->data.inval.invalidations, msgs,
3250  sizeof(SharedInvalidationMessage) * nmsgs);
3251 
3252  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3253 
3254  MemoryContextSwitchTo(oldcontext);
3255 }
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1188
void * palloc(Size size)
Definition: mcxt.c:1068
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
@ REORDER_BUFFER_CHANGE_INVALIDATION
Definition: reorderbuffer.h:60
ReorderBufferChangeType action
Definition: reorderbuffer.h:85
union ReorderBufferChange::@105 data
struct ReorderBufferChange::@105::@110 inval
struct ReorderBufferTXN * toptxn
MemoryContext context

References ReorderBufferChange::action, Assert(), ReorderBuffer::context, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferTXN::invalidations, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, palloc(), REORDER_BUFFER_CHANGE_INVALIDATION, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), repalloc(), and ReorderBufferTXN::toptxn.

Referenced by xact_decode().

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileNode  node,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 3167 of file reorderbuffer.c.

3171 {
3173  ReorderBufferTXN *txn;
3174 
3175  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3176 
3177  change->data.tuplecid.node = node;
3178  change->data.tuplecid.tid = tid;
3179  change->data.tuplecid.cmin = cmin;
3180  change->data.tuplecid.cmax = cmax;
3181  change->data.tuplecid.combocid = combocid;
3182  change->lsn = lsn;
3183  change->txn = txn;
3185 
3186  dlist_push_tail(&txn->tuplecids, &change->node);
3187  txn->ntuplecids++;
3188 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
@ REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
Definition: reorderbuffer.h:63
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:88
struct ReorderBufferChange::@105::@109 tuplecid
dlist_head tuplecids

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, and ReorderBufferChange::txn.

Referenced by SnapBuildProcessNewCid().

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

◆ ReorderBufferAllocate()

ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 301 of file reorderbuffer.c.

302 {
303  ReorderBuffer *buffer;
304  HASHCTL hash_ctl;
305  MemoryContext new_ctx;
306 
307  Assert(MyReplicationSlot != NULL);
308 
309  /* allocate memory in own context, to have better accountability */
311  "ReorderBuffer",
313 
314  buffer =
315  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
316 
317  memset(&hash_ctl, 0, sizeof(hash_ctl));
318 
319  buffer->context = new_ctx;
320 
321  buffer->change_context = SlabContextCreate(new_ctx,
322  "Change",
324  sizeof(ReorderBufferChange));
325 
326  buffer->txn_context = SlabContextCreate(new_ctx,
327  "TXN",
329  sizeof(ReorderBufferTXN));
330 
331  /*
332  * XXX the allocation sizes used below pre-date generation context's block
333  * growing code. These values should likely be benchmarked and set to
334  * more suitable values.
335  */
336  buffer->tup_context = GenerationContextCreate(new_ctx,
337  "Tuples",
341 
342  hash_ctl.keysize = sizeof(TransactionId);
343  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
344  hash_ctl.hcxt = buffer->context;
345 
346  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
348 
350  buffer->by_txn_last_txn = NULL;
351 
352  buffer->outbuf = NULL;
353  buffer->outbufsize = 0;
354  buffer->size = 0;
355 
356  buffer->spillTxns = 0;
357  buffer->spillCount = 0;
358  buffer->spillBytes = 0;
359  buffer->streamTxns = 0;
360  buffer->streamCount = 0;
361  buffer->streamBytes = 0;
362  buffer->totalTxns = 0;
363  buffer->totalBytes = 0;
364 
366 
367  dlist_init(&buffer->toplevel_by_lsn);
369 
370  /*
371  * Ensure there's no stale data from prior uses of this slot, in case some
372  * prior exit avoided calling ReorderBufferFree. Failure to do this can
373  * produce duplicated txns, and it's very cheap if there's nothing there.
374  */
376 
377  return buffer;
378 }
#define NameStr(name)
Definition: c.h:681
uint32 TransactionId
Definition: c.h:587
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: generation.c:215
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863
#define AllocSetContextCreate
Definition: memutils.h:173
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:197
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:226
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:227
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:175
ReplicationSlot * MyReplicationSlot
Definition: slot.c:97
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
MemoryContext hcxt
Definition: hsearch.h:86
MemoryContext change_context
ReorderBufferTXN * by_txn_last_txn
TransactionId by_txn_last_xid
MemoryContext tup_context
MemoryContext txn_context
XLogRecPtr current_restart_decoding_lsn
ReplicationSlotPersistentData data
Definition: slot.h:147
#define InvalidTransactionId
Definition: transam.h:31

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert(), ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, ReorderBufferCleanupSerializedTXNs(), ReorderBuffer::size, SLAB_DEFAULT_BLOCK_SIZE, SLAB_LARGE_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBuffer::toplevel_by_lsn, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBuffer::tup_context, ReorderBuffer::txn_context, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

◆ ReorderBufferApplyChange()

static void ReorderBufferApplyChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1917 of file reorderbuffer.c.

1920 {
1921  if (streaming)
1922  rb->stream_change(rb, txn, relation, change);
1923  else
1924  rb->apply_change(rb, txn, relation, change);
1925 }
ReorderBufferStreamChangeCB stream_change
ReorderBufferApplyChangeCB apply_change

References ReorderBuffer::apply_change, and ReorderBuffer::stream_change.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferApplyMessage()

static void ReorderBufferApplyMessage ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1945 of file reorderbuffer.c.

1947 {
1948  if (streaming)
1949  rb->stream_message(rb, txn, change->lsn, true,
1950  change->data.msg.prefix,
1951  change->data.msg.message_size,
1952  change->data.msg.message);
1953  else
1954  rb->message(rb, txn, change->lsn, true,
1955  change->data.msg.prefix,
1956  change->data.msg.message_size,
1957  change->data.msg.message);
1958 }
struct ReorderBufferChange::@105::@108 msg
ReorderBufferStreamMessageCB stream_message
ReorderBufferMessageCB message

References ReorderBufferChange::data, ReorderBufferChange::lsn, ReorderBuffer::message, ReorderBufferChange::msg, and ReorderBuffer::stream_message.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferApplyTruncate()

static void ReorderBufferApplyTruncate ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  nrelations,
Relation relations,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1931 of file reorderbuffer.c.

1934 {
1935  if (streaming)
1936  rb->stream_truncate(rb, txn, nrelations, relations, change);
1937  else
1938  rb->apply_truncate(rb, txn, nrelations, relations, change);
1939 }
ReorderBufferStreamTruncateCB stream_truncate
ReorderBufferApplyTruncateCB apply_truncate

References ReorderBuffer::apply_truncate, and ReorderBuffer::stream_truncate.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 1021 of file reorderbuffer.c.

1023 {
1024  ReorderBufferTXN *txn;
1025  ReorderBufferTXN *subtxn;
1026  bool new_top;
1027  bool new_sub;
1028 
1029  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1030  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1031 
1032  if (!new_sub)
1033  {
1034  if (rbtxn_is_known_subxact(subtxn))
1035  {
1036  /* already associated, nothing to do */
1037  return;
1038  }
1039  else
1040  {
1041  /*
1042  * We already saw this transaction, but initially added it to the
1043  * list of top-level txns. Now that we know it's not top-level,
1044  * remove it from there.
1045  */
1046  dlist_delete(&subtxn->node);
1047  }
1048  }
1049 
1050  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1051  subtxn->toplevel_xid = xid;
1052  Assert(subtxn->nsubtxns == 0);
1053 
1054  /* set the reference to top-level transaction */
1055  subtxn->toptxn = txn;
1056 
1057  /* add to subtransaction list */
1058  dlist_push_tail(&txn->subtxns, &subtxn->node);
1059  txn->nsubtxns++;
1060 
1061  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1062  ReorderBufferTransferSnapToParent(txn, subtxn);
1063 
1064  /* Verify LSN-ordering invariant */
1065  AssertTXNLsnOrder(rb);
1066 }
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
#define RBTXN_IS_SUBXACT
TransactionId toplevel_xid
dlist_head subtxns

References Assert(), AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SUBXACT, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, ReorderBufferTXN::toplevel_xid, ReorderBufferTXN::toptxn, and ReorderBufferTXN::txn_flags.

Referenced by LogicalDecodingProcessRecord(), and ReorderBufferCommitChild().

◆ ReorderBufferBuildTupleCidHash()

static void ReorderBufferBuildTupleCidHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1680 of file reorderbuffer.c.

1681 {
1682  dlist_iter iter;
1683  HASHCTL hash_ctl;
1684 
1686  return;
1687 
1688  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1689  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1690  hash_ctl.hcxt = rb->context;
1691 
1692  /*
1693  * create the hash with the exact number of to-be-stored tuplecids from
1694  * the start
1695  */
1696  txn->tuplecid_hash =
1697  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1699 
1700  dlist_foreach(iter, &txn->tuplecids)
1701  {
1704  bool found;
1705  ReorderBufferChange *change;
1706 
1707  change = dlist_container(ReorderBufferChange, node, iter.cur);
1708 
1710 
1711  /* be careful about padding */
1712  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1713 
1714  key.relnode = change->data.tuplecid.node;
1715 
1716  ItemPointerCopy(&change->data.tuplecid.tid,
1717  &key.tid);
1718 
1719  ent = (ReorderBufferTupleCidEnt *)
1721  (void *) &key,
1722  HASH_ENTER,
1723  &found);
1724  if (!found)
1725  {
1726  ent->cmin = change->data.tuplecid.cmin;
1727  ent->cmax = change->data.tuplecid.cmax;
1728  ent->combocid = change->data.tuplecid.combocid;
1729  }
1730  else
1731  {
1732  /*
1733  * Maybe we already saw this tuple before in this transaction, but
1734  * if so it must have the same cmin.
1735  */
1736  Assert(ent->cmin == change->data.tuplecid.cmin);
1737 
1738  /*
1739  * cmax may be initially invalid, but once set it can only grow,
1740  * and never become invalid again.
1741  */
1742  Assert((ent->cmax == InvalidCommandId) ||
1743  ((change->data.tuplecid.cmax != InvalidCommandId) &&
1744  (change->data.tuplecid.cmax > ent->cmax)));
1745  ent->cmax = change->data.tuplecid.cmax;
1746  }
1747  }
1748 }
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
#define rbtxn_has_catalog_changes(txn)

References ReorderBufferChange::action, Assert(), ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, dlist_container, dlist_foreach, dlist_is_empty(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), HASHCTL::hcxt, InvalidCommandId, ItemPointerCopy, sort-test::key, HASHCTL::keysize, ReorderBufferTXN::ntuplecids, rbtxn_has_catalog_changes, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::tuplecids.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferCanStartStreaming()

static bool ReorderBufferCanStartStreaming ( ReorderBuffer rb)
inlinestatic

Definition at line 3839 of file reorderbuffer.c.

3840 {
3842  SnapBuild *builder = ctx->snapshot_builder;
3843 
3844  /* We can't start streaming unless a consistent state is reached. */
3846  return false;
3847 
3848  /*
3849  * We can't start streaming immediately even if the streaming is enabled
3850  * because we previously decoded this transaction and now just are
3851  * restarting.
3852  */
3853  if (ReorderBufferCanStream(rb) &&
3854  !SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
3855  return true;
3856 
3857  return false;
3858 }
static bool ReorderBufferCanStream(ReorderBuffer *rb)
bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
Definition: snapbuild.c:394
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
Definition: snapbuild.c:367
@ SNAPBUILD_CONSISTENT
Definition: snapbuild.h:46
XLogReaderState * reader
Definition: logical.h:42
struct SnapBuild * snapshot_builder
Definition: logical.h:44
void * private_data
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207

References XLogReaderState::EndRecPtr, ReorderBuffer::private_data, LogicalDecodingContext::reader, ReorderBufferCanStream(), SNAPBUILD_CONSISTENT, SnapBuildCurrentState(), SnapBuildXactNeedsSkip(), and LogicalDecodingContext::snapshot_builder.

Referenced by ReorderBufferCheckMemoryLimit(), and ReorderBufferProcessPartialChange().

◆ ReorderBufferCanStream()

static bool ReorderBufferCanStream ( ReorderBuffer rb)
inlinestatic

Definition at line 3830 of file reorderbuffer.c.

3831 {
3833 
3834  return ctx->streaming;
3835 }

References ReorderBuffer::private_data, and LogicalDecodingContext::streaming.

Referenced by ReorderBufferCanStartStreaming(), and ReorderBufferProcessPartialChange().

◆ ReorderBufferChangeMemoryUpdate()

static void ReorderBufferChangeMemoryUpdate ( ReorderBuffer rb,
ReorderBufferChange change,
bool  addition,
Size  sz 
)
static

Definition at line 3110 of file reorderbuffer.c.

3113 {
3114  ReorderBufferTXN *txn;
3115  ReorderBufferTXN *toptxn;
3116 
3117  Assert(change->txn);
3118 
3119  /*
3120  * Ignore tuple CID changes, because those are not evicted when reaching
3121  * memory limit. So we just don't count them, because it might easily
3122  * trigger a pointless attempt to spill.
3123  */
3125  return;
3126 
3127  txn = change->txn;
3128 
3129  /*
3130  * Update the total size in top level as well. This is later used to
3131  * compute the decoding stats.
3132  */
3133  if (txn->toptxn != NULL)
3134  toptxn = txn->toptxn;
3135  else
3136  toptxn = txn;
3137 
3138  if (addition)
3139  {
3140  txn->size += sz;
3141  rb->size += sz;
3142 
3143  /* Update the total size in the top transaction. */
3144  toptxn->total_size += sz;
3145  }
3146  else
3147  {
3148  Assert((rb->size >= sz) && (txn->size >= sz));
3149  txn->size -= sz;
3150  rb->size -= sz;
3151 
3152  /* Update the total size in the top transaction. */
3153  toptxn->total_size -= sz;
3154  }
3155 
3156  Assert(txn->size <= rb->size);
3157 }

References ReorderBufferChange::action, Assert(), REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferTXN::size, ReorderBuffer::size, ReorderBufferTXN::toptxn, ReorderBufferTXN::total_size, and ReorderBufferChange::txn.

Referenced by ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferReturnChange(), and ReorderBufferToastReplace().

◆ ReorderBufferChangeSize()

static Size ReorderBufferChangeSize ( ReorderBufferChange change)
static

Definition at line 3982 of file reorderbuffer.c.

3983 {
3984  Size sz = sizeof(ReorderBufferChange);
3985 
3986  switch (change->action)
3987  {
3988  /* fall through these, they're all similar enough */
3993  {
3994  ReorderBufferTupleBuf *oldtup,
3995  *newtup;
3996  Size oldlen = 0;
3997  Size newlen = 0;
3998 
3999  oldtup = change->data.tp.oldtuple;
4000  newtup = change->data.tp.newtuple;
4001 
4002  if (oldtup)
4003  {
4004  sz += sizeof(HeapTupleData);
4005  oldlen = oldtup->tuple.t_len;
4006  sz += oldlen;
4007  }
4008 
4009  if (newtup)
4010  {
4011  sz += sizeof(HeapTupleData);
4012  newlen = newtup->tuple.t_len;
4013  sz += newlen;
4014  }
4015 
4016  break;
4017  }
4019  {
4020  Size prefix_size = strlen(change->data.msg.prefix) + 1;
4021 
4022  sz += prefix_size + change->data.msg.message_size +
4023  sizeof(Size) + sizeof(Size);
4024 
4025  break;
4026  }
4028  {
4029  sz += sizeof(SharedInvalidationMessage) *
4030  change->data.inval.ninvalidations;
4031  break;
4032  }
4034  {
4035  Snapshot snap;
4036 
4037  snap = change->data.snapshot;
4038 
4039  sz += sizeof(SnapshotData) +
4040  sizeof(TransactionId) * snap->xcnt +
4041  sizeof(TransactionId) * snap->subxcnt;
4042 
4043  break;
4044  }
4046  {
4047  sz += sizeof(Oid) * change->data.truncate.nrelids;
4048 
4049  break;
4050  }
4055  /* ReorderBufferChange contains everything important */
4056  break;
4057  }
4058 
4059  return sz;
4060 }
size_t Size
Definition: c.h:540
struct HeapTupleData HeapTupleData
unsigned int Oid
Definition: postgres_ext.h:31
struct ReorderBufferChange ReorderBufferChange
@ REORDER_BUFFER_CHANGE_MESSAGE
Definition: reorderbuffer.h:59
@ REORDER_BUFFER_CHANGE_TRUNCATE
Definition: reorderbuffer.h:67
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:58
struct SnapshotData SnapshotData
uint32 t_len
Definition: htup.h:64
struct ReorderBufferChange::@105::@107 truncate
struct ReorderBufferChange::@105::@106 tp
HeapTupleData tuple
Definition: reorderbuffer.h:29
int32 subxcnt
Definition: snapshot.h:181
uint32 xcnt
Definition: snapshot.h:169

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::msg, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::snapshot, SnapshotData::subxcnt, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTupleBuf::tuple, and SnapshotData::xcnt.

Referenced by ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferReturnChange(), and ReorderBufferToastReplace().

◆ ReorderBufferCheckMemoryLimit()

static void ReorderBufferCheckMemoryLimit ( ReorderBuffer rb)
static

Definition at line 3458 of file reorderbuffer.c.

3459 {
3460  ReorderBufferTXN *txn;
3461 
3462  /* bail out if we haven't exceeded the memory limit */
3463  if (rb->size < logical_decoding_work_mem * 1024L)
3464  return;
3465 
3466  /*
3467  * Loop until we reach under the memory limit. One might think that just
3468  * by evicting the largest (sub)transaction we will come under the memory
3469  * limit based on assumption that the selected transaction is at least as
3470  * large as the most recent change (which caused us to go over the memory
3471  * limit). However, that is not true because a user can reduce the
3472  * logical_decoding_work_mem to a smaller value before the most recent
3473  * change.
3474  */
3475  while (rb->size >= logical_decoding_work_mem * 1024L)
3476  {
3477  /*
3478  * Pick the largest transaction (or subtransaction) and evict it from
3479  * memory by streaming, if possible. Otherwise, spill to disk.
3480  */
3482  (txn = ReorderBufferLargestTopTXN(rb)) != NULL)
3483  {
3484  /* we know there has to be one, because the size is not zero */
3485  Assert(txn && !txn->toptxn);
3486  Assert(txn->total_size > 0);
3487  Assert(rb->size >= txn->total_size);
3488 
3489  ReorderBufferStreamTXN(rb, txn);
3490  }
3491  else
3492  {
3493  /*
3494  * Pick the largest transaction (or subtransaction) and evict it
3495  * from memory by serializing it to disk.
3496  */
3497  txn = ReorderBufferLargestTXN(rb);
3498 
3499  /* we know there has to be one, because the size is not zero */
3500  Assert(txn);
3501  Assert(txn->size > 0);
3502  Assert(rb->size >= txn->size);
3503 
3504  ReorderBufferSerializeTXN(rb, txn);
3505  }
3506 
3507  /*
3508  * After eviction, the transaction should have no entries in memory,
3509  * and should use 0 bytes for changes.
3510  */
3511  Assert(txn->size == 0);
3512  Assert(txn->nentries_mem == 0);
3513  }
3514 
3515  /* We must be under the memory limit now. */
3516  Assert(rb->size < logical_decoding_work_mem * 1024L);
3517 }
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)
int logical_decoding_work_mem
static ReorderBufferTXN * ReorderBufferLargestTopTXN(ReorderBuffer *rb)
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)

References Assert(), logical_decoding_work_mem, ReorderBufferTXN::nentries_mem, ReorderBufferCanStartStreaming(), ReorderBufferLargestTopTXN(), ReorderBufferLargestTXN(), ReorderBufferSerializeTXN(), ReorderBufferStreamTXN(), ReorderBufferTXN::size, ReorderBuffer::size, ReorderBufferTXN::toptxn, and ReorderBufferTXN::total_size.

Referenced by ReorderBufferQueueChange().

◆ ReorderBufferCleanupSerializedTXNs()

static void ReorderBufferCleanupSerializedTXNs ( const char *  slotname)
static

Definition at line 4406 of file reorderbuffer.c.

4407 {
4408  DIR *spill_dir;
4409  struct dirent *spill_de;
4410  struct stat statbuf;
4411  char path[MAXPGPATH * 2 + 12];
4412 
4413  sprintf(path, "pg_replslot/%s", slotname);
4414 
4415  /* we're only handling directories here, skip if it's not ours */
4416  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
4417  return;
4418 
4419  spill_dir = AllocateDir(path);
4420  while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
4421  {
4422  /* only look at names that can be ours */
4423  if (strncmp(spill_de->d_name, "xid", 3) == 0)
4424  {
4425  snprintf(path, sizeof(path),
4426  "pg_replslot/%s/%s", slotname,
4427  spill_de->d_name);
4428 
4429  if (unlink(path) != 0)
4430  ereport(ERROR,
4432  errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
4433  path, slotname)));
4434  }
4435  }
4436  FreeDir(spill_dir);
4437 }
#define INFO
Definition: elog.h:28
int FreeDir(DIR *dir)
Definition: fd.c:2840
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2803
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2722
#define snprintf
Definition: port.h:225
Definition: dirent.c:26
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15
#define lstat(path, sb)
Definition: win32_port.h:284
#define S_ISDIR(m)
Definition: win32_port.h:324

References AllocateDir(), dirent::d_name, ereport, errcode_for_file_access(), errmsg(), ERROR, FreeDir(), INFO, lstat, MAXPGPATH, ReadDirExtended(), S_ISDIR, snprintf, sprintf, and stat::st_mode.

Referenced by ReorderBufferAllocate(), ReorderBufferFree(), and StartupReorderBuffer().

◆ ReorderBufferCleanupTXN()

static void ReorderBufferCleanupTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1457 of file reorderbuffer.c.

1458 {
1459  bool found;
1460  dlist_mutable_iter iter;
1461 
1462  /* cleanup subtransactions & their changes */
1463  dlist_foreach_modify(iter, &txn->subtxns)
1464  {
1465  ReorderBufferTXN *subtxn;
1466 
1467  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1468 
1469  /*
1470  * Subtransactions are always associated to the toplevel TXN, even if
1471  * they originally were happening inside another subtxn, so we won't
1472  * ever recurse more than one level deep here.
1473  */
1474  Assert(rbtxn_is_known_subxact(subtxn));
1475  Assert(subtxn->nsubtxns == 0);
1476 
1477  ReorderBufferCleanupTXN(rb, subtxn);
1478  }
1479 
1480  /* cleanup changes in the txn */
1481  dlist_foreach_modify(iter, &txn->changes)
1482  {
1483  ReorderBufferChange *change;
1484 
1485  change = dlist_container(ReorderBufferChange, node, iter.cur);
1486 
1487  /* Check we're not mixing changes from different transactions. */
1488  Assert(change->txn == txn);
1489 
1490  ReorderBufferReturnChange(rb, change, true);
1491  }
1492 
1493  /*
1494  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1495  * They are always stored in the toplevel transaction.
1496  */
1497  dlist_foreach_modify(iter, &txn->tuplecids)
1498  {
1499  ReorderBufferChange *change;
1500 
1501  change = dlist_container(ReorderBufferChange, node, iter.cur);
1502 
1503  /* Check we're not mixing changes from different transactions. */
1504  Assert(change->txn == txn);
1506 
1507  ReorderBufferReturnChange(rb, change, true);
1508  }
1509 
1510  /*
1511  * Cleanup the base snapshot, if set.
1512  */
1513  if (txn->base_snapshot != NULL)
1514  {
1517  }
1518 
1519  /*
1520  * Cleanup the snapshot for the last streamed run.
1521  */
1522  if (txn->snapshot_now != NULL)
1523  {
1524  Assert(rbtxn_is_streamed(txn));
1526  }
1527 
1528  /*
1529  * Remove TXN from its containing list.
1530  *
1531  * Note: if txn is known as subxact, we are deleting the TXN from its
1532  * parent's list of known subxacts; this leaves the parent's nsubxacts
1533  * count too high, but we don't care. Otherwise, we are deleting the TXN
1534  * from the LSN-ordered list of toplevel TXNs.
1535  */
1536  dlist_delete(&txn->node);
1537 
1538  /* now remove reference from buffer */
1539  hash_search(rb->by_txn,
1540  (void *) &txn->xid,
1541  HASH_REMOVE,
1542  &found);
1543  Assert(found);
1544 
1545  /* remove entries spilled to disk */
1546  if (rbtxn_is_serialized(txn))
1547  ReorderBufferRestoreCleanup(rb, txn);
1548 
1549  /* deallocate */
1550  ReorderBufferReturnTXN(rb, txn);
1551 }
@ HASH_REMOVE
Definition: hsearch.h:115
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
#define rbtxn_is_serialized(txn)
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:418
Snapshot snapshot_now
dlist_node base_snapshot_node

References ReorderBufferChange::action, Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_node, ReorderBuffer::by_txn, ReorderBufferTXN::changes, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, HASH_REMOVE, hash_search(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, rbtxn_is_serialized, rbtxn_is_streamed, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferFreeSnap(), ReorderBufferRestoreCleanup(), ReorderBufferReturnChange(), ReorderBufferReturnTXN(), SnapBuildSnapDecRefcount(), ReorderBufferTXN::snapshot_now, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAbortOld(), ReorderBufferFinishPrepared(), ReorderBufferForget(), ReorderBufferProcessTXN(), ReorderBufferReplay(), and ReorderBufferStreamCommit().

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2638 of file reorderbuffer.c.

2642 {
2643  ReorderBufferTXN *txn;
2644 
2645  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2646  false);
2647 
2648  /* unknown transaction, nothing to replay */
2649  if (txn == NULL)
2650  return;
2651 
2652  ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2653  origin_id, origin_lsn);
2654 }
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)

References InvalidXLogRecPtr, ReorderBufferReplay(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 1141 of file reorderbuffer.c.

1144 {
1145  ReorderBufferTXN *subtxn;
1146 
1147  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1148  InvalidXLogRecPtr, false);
1149 
1150  /*
1151  * No need to do anything if that subtxn didn't contain any changes
1152  */
1153  if (!subtxn)
1154  return;
1155 
1156  subtxn->final_lsn = commit_lsn;
1157  subtxn->end_lsn = end_lsn;
1158 
1159  /*
1160  * Assign this subxact as a child of the toplevel xact (no-op if already
1161  * done.)
1162  */
1163  ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1164 }
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit(), and DecodePrepare().

◆ ReorderBufferCopySnap()

static Snapshot ReorderBufferCopySnap ( ReorderBuffer rb,
Snapshot  orig_snap,
ReorderBufferTXN txn,
CommandId  cid 
)
static

Definition at line 1756 of file reorderbuffer.c.

1758 {
1759  Snapshot snap;
1760  dlist_iter iter;
1761  int i = 0;
1762  Size size;
1763 
1764  size = sizeof(SnapshotData) +
1765  sizeof(TransactionId) * orig_snap->xcnt +
1766  sizeof(TransactionId) * (txn->nsubtxns + 1);
1767 
1768  snap = MemoryContextAllocZero(rb->context, size);
1769  memcpy(snap, orig_snap, sizeof(SnapshotData));
1770 
1771  snap->copied = true;
1772  snap->active_count = 1; /* mark as active so nobody frees it */
1773  snap->regd_count = 0;
1774  snap->xip = (TransactionId *) (snap + 1);
1775 
1776  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1777 
1778  /*
1779  * snap->subxip contains all txids that belong to our transaction which we
1780  * need to check via cmin/cmax. That's why we store the toplevel
1781  * transaction in there as well.
1782  */
1783  snap->subxip = snap->xip + snap->xcnt;
1784  snap->subxip[i++] = txn->xid;
1785 
1786  /*
1787  * subxcnt isn't decreased when subtransactions abort, so count manually.
1788  * Since it's an upper boundary it is safe to use it for the allocation
1789  * above.
1790  */
1791  snap->subxcnt = 1;
1792 
1793  dlist_foreach(iter, &txn->subtxns)
1794  {
1795  ReorderBufferTXN *sub_txn;
1796 
1797  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1798  snap->subxip[i++] = sub_txn->xid;
1799  snap->subxcnt++;
1800  }
1801 
1802  /* sort so we can bsearch() later */
1803  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1804 
1805  /* store the specified current CommandId */
1806  snap->curcid = cid;
1807 
1808  return snap;
1809 }
int i
Definition: isn.c:73
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:906
#define qsort(a, b, c, d)
Definition: port.h:495
bool copied
Definition: snapshot.h:185
uint32 regd_count
Definition: snapshot.h:205
uint32 active_count
Definition: snapshot.h:204
CommandId curcid
Definition: snapshot.h:187
TransactionId * subxip
Definition: snapshot.h:180
TransactionId * xip
Definition: snapshot.h:168
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:136

References SnapshotData::active_count, ReorderBuffer::context, SnapshotData::copied, dlist_iter::cur, SnapshotData::curcid, dlist_container, dlist_foreach, i, MemoryContextAllocZero(), ReorderBufferTXN::nsubtxns, qsort, SnapshotData::regd_count, ReorderBufferTXN::subtxns, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::xcnt, ReorderBufferTXN::xid, xidComparator(), and SnapshotData::xip.

Referenced by ReorderBufferProcessTXN(), ReorderBufferSaveTXNSnapshot(), and ReorderBufferStreamTXN().

◆ ReorderBufferExecuteInvalidations()

static void ReorderBufferExecuteInvalidations ( uint32  nmsgs,
SharedInvalidationMessage msgs 
)
static

Definition at line 3262 of file reorderbuffer.c.

3263 {
3264  int i;
3265 
3266  for (i = 0; i < nmsgs; i++)
3268 }
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:615

References i, and LocalExecuteInvalidationMessage().

Referenced by ReorderBufferFinishPrepared(), and ReorderBufferProcessTXN().

◆ ReorderBufferFinishPrepared()

void ReorderBufferFinishPrepared ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
XLogRecPtr  two_phase_at,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn,
char *  gid,
bool  is_commit 
)

Definition at line 2744 of file reorderbuffer.c.

2749 {
2750  ReorderBufferTXN *txn;
2751  XLogRecPtr prepare_end_lsn;
2752  TimestampTz prepare_time;
2753 
2754  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
2755 
2756  /* unknown transaction, nothing to do */
2757  if (txn == NULL)
2758  return;
2759 
2760  /*
2761  * By this time the txn has the prepare record information, remember it to
2762  * be later used for rollback.
2763  */
2764  prepare_end_lsn = txn->end_lsn;
2765  prepare_time = txn->xact_time.prepare_time;
2766 
2767  /* add the gid in the txn */
2768  txn->gid = pstrdup(gid);
2769 
2770  /*
2771  * It is possible that this transaction is not decoded at prepare time
2772  * either because by that time we didn't have a consistent snapshot, or
2773  * two_phase was not enabled, or it was decoded earlier but we have
2774  * restarted. We only need to send the prepare if it was not decoded
2775  * earlier. We don't need to decode the xact for aborts if it is not done
2776  * already.
2777  */
2778  if ((txn->final_lsn < two_phase_at) && is_commit)
2779  {
2780  txn->txn_flags |= RBTXN_PREPARE;
2781 
2782  /*
2783  * The prepare info must have been updated in txn even if we skip
2784  * prepare.
2785  */
2787 
2788  /*
2789  * By this time the txn has the prepare record information and it is
2790  * important to use that so that downstream gets the accurate
2791  * information. If instead, we have passed commit information here
2792  * then downstream can behave as it has already replayed commit
2793  * prepared after the restart.
2794  */
2795  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2796  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2797  }
2798 
2799  txn->final_lsn = commit_lsn;
2800  txn->end_lsn = end_lsn;
2801  txn->xact_time.commit_time = commit_time;
2802  txn->origin_id = origin_id;
2803  txn->origin_lsn = origin_lsn;
2804 
2805  if (is_commit)
2806  rb->commit_prepared(rb, txn, commit_lsn);
2807  else
2808  rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
2809 
2810  /* cleanup: make sure there's no cache pollution */
2812  txn->invalidations);
2813  ReorderBufferCleanupTXN(rb, txn);
2814 }
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1305
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
#define RBTXN_PREPARE
TimestampTz commit_time
RepOriginId origin_id
XLogRecPtr origin_lsn
TimestampTz prepare_time
union ReorderBufferTXN::@111 xact_time
ReorderBufferCommitPreparedCB commit_prepared
ReorderBufferRollbackPreparedCB rollback_prepared

References Assert(), ReorderBuffer::commit_prepared, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, pstrdup(), RBTXN_PREPARE, ReorderBufferCleanupTXN(), ReorderBufferExecuteInvalidations(), ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBuffer::rollback_prepared, ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodeAbort(), and DecodeCommit().

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2914 of file reorderbuffer.c.

2915 {
2916  ReorderBufferTXN *txn;
2917 
2918  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2919  false);
2920 
2921  /* unknown, nothing to forget */
2922  if (txn == NULL)
2923  return;
2924 
2925  /* For streamed transactions notify the remote node about the abort. */
2926  if (rbtxn_is_streamed(txn))
2927  rb->stream_abort(rb, txn, lsn);
2928 
2929  /* cosmetic... */
2930  txn->final_lsn = lsn;
2931 
2932  /*
2933  * Process cache invalidation messages if there are any. Even if we're not
2934  * interested in the transaction's contents, it could have manipulated the
2935  * catalog and we need to update the caches according to that.
2936  */
2937  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2939  txn->invalidations);
2940  else
2941  Assert(txn->ninvalidations == 0);
2942 
2943  /* remove potential on-disk data, and deallocate */
2944  ReorderBufferCleanupTXN(rb, txn);
2945 }

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), and ReorderBuffer::stream_abort.

Referenced by DecodeCommit().

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer rb)

Definition at line 384 of file reorderbuffer.c.

385 {
386  MemoryContext context = rb->context;
387 
388  /*
389  * We free separately allocated data by entirely scrapping reorderbuffer's
390  * memory context.
391  */
392  MemoryContextDelete(context);
393 
394  /* Free disk space used by unconsumed reorder buffers */
396 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:218

References ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

◆ ReorderBufferFreeSnap()

static void ReorderBufferFreeSnap ( ReorderBuffer rb,
Snapshot  snap 
)
static

Definition at line 1815 of file reorderbuffer.c.

1816 {
1817  if (snap->copied)
1818  pfree(snap);
1819  else
1821 }
void pfree(void *pointer)
Definition: mcxt.c:1175

References SnapshotData::copied, pfree(), and SnapBuildSnapDecRefcount().

Referenced by ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), ReorderBufferReturnChange(), and ReorderBufferStreamTXN().

◆ ReorderBufferGetChange()

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer rb)

Definition at line 966 of file reorderbuffer.c.

967 {
968  ReorderBufferTXN *txn;
969 
970  AssertTXNLsnOrder(rb);
971 
973  return NULL;
974 
976 
979  return txn;
980 }
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:506

References Assert(), AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, and ReorderBuffer::toplevel_by_lsn.

Referenced by SnapBuildProcessRunningXacts().

◆ ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBuffer rb)

Definition at line 994 of file reorderbuffer.c.

995 {
996  ReorderBufferTXN *txn;
997 
998  AssertTXNLsnOrder(rb);
999 
1001  return InvalidTransactionId;
1002 
1003  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
1005  return txn->base_snapshot->xmin;
1006 }
TransactionId xmin
Definition: snapshot.h:157

References AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, dlist_head_element, dlist_is_empty(), InvalidTransactionId, ReorderBuffer::txns_by_base_snapshot_lsn, and SnapshotData::xmin.

Referenced by SnapBuildProcessRunningXacts().

◆ ReorderBufferGetRelids()

Oid* ReorderBufferGetRelids ( ReorderBuffer rb,
int  nrelids 
)

Definition at line 586 of file reorderbuffer.c.

587 {
588  Oid *relids;
589  Size alloc_len;
590 
591  alloc_len = sizeof(Oid) * nrelids;
592 
593  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
594 
595  return relids;
596 }

References ReorderBuffer::context, and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

◆ ReorderBufferGetTupleBuf()

ReorderBufferTupleBuf* ReorderBufferGetTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)

Definition at line 550 of file reorderbuffer.c.

551 {
552  ReorderBufferTupleBuf *tuple;
553  Size alloc_len;
554 
555  alloc_len = tuple_len + SizeofHeapTupleHeader;
556 
557  tuple = (ReorderBufferTupleBuf *)
559  sizeof(ReorderBufferTupleBuf) +
560  MAXIMUM_ALIGNOF + alloc_len);
561  tuple->alloc_tuple_size = alloc_len;
562  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
563 
564  return tuple;
565 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:38
HeapTupleHeader t_data
Definition: htup.h:68

References ReorderBufferTupleBuf::alloc_tuple_size, MemoryContextAlloc(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, HeapTupleData::t_data, ReorderBuffer::tup_context, and ReorderBufferTupleBuf::tuple.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

◆ ReorderBufferGetTXN()

static ReorderBufferTXN * ReorderBufferGetTXN ( ReorderBuffer rb)
static

Definition at line 402 of file reorderbuffer.c.

403 {
404  ReorderBufferTXN *txn;
405 
406  txn = (ReorderBufferTXN *)
408 
409  memset(txn, 0, sizeof(ReorderBufferTXN));
410 
411  dlist_init(&txn->changes);
412  dlist_init(&txn->tuplecids);
413  dlist_init(&txn->subtxns);
414 
415  /* InvalidCommandId is not zero, so set it explicitly */
417  txn->output_plugin_private = NULL;
418 
419  return txn;
420 }
CommandId command_id
void * output_plugin_private

References ReorderBufferTXN::changes, ReorderBufferTXN::command_id, dlist_init(), InvalidCommandId, MemoryContextAlloc(), ReorderBufferTXN::output_plugin_private, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, and ReorderBuffer::txn_context.

Referenced by ReorderBufferTXNByXid().

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 2987 of file reorderbuffer.c.

2989 {
2990  bool use_subtxn = IsTransactionOrTransactionBlock();
2991  int i;
2992 
2993  if (use_subtxn)
2994  BeginInternalSubTransaction("replay");
2995 
2996  /*
2997  * Force invalidations to happen outside of a valid transaction - that way
2998  * entries will just be marked as invalid without accessing the catalog.
2999  * That's advantageous because we don't need to setup the full state
3000  * necessary for catalog access.
3001  */
3002  if (use_subtxn)
3004 
3005  for (i = 0; i < ninvalidations; i++)
3006  LocalExecuteInvalidationMessage(&invalidations[i]);
3007 
3008  if (use_subtxn)
3010 }
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4784
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4490
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4595
void AbortCurrentTransaction(void)
Definition: xact.c:3293

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by ReorderBufferAbort(), ReorderBufferForget(), ReorderBufferInvalidate(), and xact_decode().

◆ ReorderBufferInvalidate()

void ReorderBufferInvalidate ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2956 of file reorderbuffer.c.

2957 {
2958  ReorderBufferTXN *txn;
2959 
2960  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2961  false);
2962 
2963  /* unknown, nothing to do */
2964  if (txn == NULL)
2965  return;
2966 
2967  /*
2968  * Process cache invalidation messages if there are any. Even if we're not
2969  * interested in the transaction's contents, it could have manipulated the
2970  * catalog and we need to update the caches according to that.
2971  */
2972  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2974  txn->invalidations);
2975  else
2976  Assert(txn->ninvalidations == 0);
2977 }

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodePrepare().

◆ ReorderBufferIterCompare()

static int ReorderBufferIterCompare ( Datum  a,
Datum  b,
void *  arg 
)
static

Definition at line 1183 of file reorderbuffer.c.

1184 {
1186  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
1187  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
1188 
1189  if (pos_a < pos_b)
1190  return 1;
1191  else if (pos_a == pos_b)
1192  return 0;
1193  return -1;
1194 }
void * arg
#define DatumGetInt32(X)
Definition: postgres.h:516
Definition: regguts.h:318

References a, arg, b, and DatumGetInt32.

Referenced by ReorderBufferIterTXNInit().

◆ ReorderBufferIterTXNFinish()

static void ReorderBufferIterTXNFinish ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1426 of file reorderbuffer.c.

1428 {
1429  int32 off;
1430 
1431  for (off = 0; off < state->nr_txns; off++)
1432  {
1433  if (state->entries[off].file.vfd != -1)
1434  FileClose(state->entries[off].file.vfd);
1435  }
1436 
1437  /* free memory we might have "leaked" in the last *Next call */
1438  if (!dlist_is_empty(&state->old_change))
1439  {
1440  ReorderBufferChange *change;
1441 
1442  change = dlist_container(ReorderBufferChange, node,
1443  dlist_pop_head_node(&state->old_change));
1444  ReorderBufferReturnChange(rb, change, true);
1445  Assert(dlist_is_empty(&state->old_change));
1446  }
1447 
1448  binaryheap_free(state->heap);
1449  pfree(state);
1450 }
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:68
void FileClose(File file)
Definition: fd.c:1961
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368

References Assert(), binaryheap_free(), dlist_container, dlist_is_empty(), dlist_pop_head_node(), FileClose(), pfree(), and ReorderBufferReturnChange().

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferIterTXNInit()

static void ReorderBufferIterTXNInit ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferIterTXNState *volatile *  iter_state 
)
static

Definition at line 1206 of file reorderbuffer.c.

1208 {
1209  Size nr_txns = 0;
1211  dlist_iter cur_txn_i;
1212  int32 off;
1213 
1214  *iter_state = NULL;
1215 
1216  /* Check ordering of changes in the toplevel transaction. */
1217  AssertChangeLsnOrder(txn);
1218 
1219  /*
1220  * Calculate the size of our heap: one element for every transaction that
1221  * contains changes. (Besides the transactions already in the reorder
1222  * buffer, we count the one we were directly passed.)
1223  */
1224  if (txn->nentries > 0)
1225  nr_txns++;
1226 
1227  dlist_foreach(cur_txn_i, &txn->subtxns)
1228  {
1229  ReorderBufferTXN *cur_txn;
1230 
1231  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1232 
1233  /* Check ordering of changes in this subtransaction. */
1234  AssertChangeLsnOrder(cur_txn);
1235 
1236  if (cur_txn->nentries > 0)
1237  nr_txns++;
1238  }
1239 
1240  /* allocate iteration state */
1243  sizeof(ReorderBufferIterTXNState) +
1244  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1245 
1246  state->nr_txns = nr_txns;
1247  dlist_init(&state->old_change);
1248 
1249  for (off = 0; off < state->nr_txns; off++)
1250  {
1251  state->entries[off].file.vfd = -1;
1252  state->entries[off].segno = 0;
1253  }
1254 
1255  /* allocate heap */
1256  state->heap = binaryheap_allocate(state->nr_txns,
1258  state);
1259 
1260  /* Now that the state fields are initialized, it is safe to return it. */
1261  *iter_state = state;
1262 
1263  /*
1264  * Now insert items into the binary heap, in an unordered fashion. (We
1265  * will run a heap assembly step at the end; this is more efficient.)
1266  */
1267 
1268  off = 0;
1269 
1270  /* add toplevel transaction if it contains changes */
1271  if (txn->nentries > 0)
1272  {
1273  ReorderBufferChange *cur_change;
1274 
1275  if (rbtxn_is_serialized(txn))
1276  {
1277  /* serialize remaining changes */
1278  ReorderBufferSerializeTXN(rb, txn);
1279  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1280  &state->entries[off].segno);
1281  }
1282 
1283  cur_change = dlist_head_element(ReorderBufferChange, node,
1284  &txn->changes);
1285 
1286  state->entries[off].lsn = cur_change->lsn;
1287  state->entries[off].change = cur_change;
1288  state->entries[off].txn = txn;
1289 
1291  }
1292 
1293  /* add subtransactions if they contain changes */
1294  dlist_foreach(cur_txn_i, &txn->subtxns)
1295  {
1296  ReorderBufferTXN *cur_txn;
1297 
1298  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1299 
1300  if (cur_txn->nentries > 0)
1301  {
1302  ReorderBufferChange *cur_change;
1303 
1304  if (rbtxn_is_serialized(cur_txn))
1305  {
1306  /* serialize remaining changes */
1307  ReorderBufferSerializeTXN(rb, cur_txn);
1308  ReorderBufferRestoreChanges(rb, cur_txn,
1309  &state->entries[off].file,
1310  &state->entries[off].segno);
1311  }
1312  cur_change = dlist_head_element(ReorderBufferChange, node,
1313  &cur_txn->changes);
1314 
1315  state->entries[off].lsn = cur_change->lsn;
1316  state->entries[off].change = cur_change;
1317  state->entries[off].txn = cur_txn;
1318 
1320  }
1321  }
1322 
1323  /* assemble a valid binary heap */
1324  binaryheap_build(state->heap);
1325 }
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:125
void binaryheap_add_unordered(binaryheap *heap, Datum d)
Definition: binaryheap.c:109
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:32
#define Int32GetDatum(X)
Definition: postgres.h:523
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
static void AssertChangeLsnOrder(ReorderBufferTXN *txn)
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)

References AssertChangeLsnOrder(), binaryheap_add_unordered(), binaryheap_allocate(), binaryheap_build(), ReorderBufferTXN::changes, ReorderBuffer::context, dlist_iter::cur, dlist_container, dlist_foreach, dlist_head_element, dlist_init(), Int32GetDatum, ReorderBufferChange::lsn, MemoryContextAllocZero(), ReorderBufferTXN::nentries, rbtxn_is_serialized, ReorderBufferIterCompare(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), and ReorderBufferTXN::subtxns.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferIterTXNNext()

static ReorderBufferChange * ReorderBufferIterTXNNext ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1334 of file reorderbuffer.c.

1335 {
1336  ReorderBufferChange *change;
1338  int32 off;
1339 
1340  /* nothing there anymore */
1341  if (state->heap->bh_size == 0)
1342  return NULL;
1343 
1344  off = DatumGetInt32(binaryheap_first(state->heap));
1345  entry = &state->entries[off];
1346 
1347  /* free memory we might have "leaked" in the previous *Next call */
1348  if (!dlist_is_empty(&state->old_change))
1349  {
1350  change = dlist_container(ReorderBufferChange, node,
1351  dlist_pop_head_node(&state->old_change));
1352  ReorderBufferReturnChange(rb, change, true);
1353  Assert(dlist_is_empty(&state->old_change));
1354  }
1355 
1356  change = entry->change;
1357 
1358  /*
1359  * update heap with information about which transaction has the next
1360  * relevant change in LSN order
1361  */
1362 
1363  /* there are in-memory changes */
1364  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1365  {
1366  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1367  ReorderBufferChange *next_change =
1369 
1370  /* txn stays the same */
1371  state->entries[off].lsn = next_change->lsn;
1372  state->entries[off].change = next_change;
1373 
1375  return change;
1376  }
1377 
1378  /* try to load changes from disk */
1379  if (entry->txn->nentries != entry->txn->nentries_mem)
1380  {
1381  /*
1382  * Ugly: restoring changes will reuse *Change records, thus delete the
1383  * current one from the per-tx list and only free in the next call.
1384  */
1385  dlist_delete(&change->node);
1386  dlist_push_tail(&state->old_change, &change->node);
1387 
1388  /*
1389  * Update the total bytes processed by the txn for which we are
1390  * releasing the current set of changes and restoring the new set of
1391  * changes.
1392  */
1393  rb->totalBytes += entry->txn->size;
1394  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1395  &state->entries[off].segno))
1396  {
1397  /* successfully restored changes from disk */
1398  ReorderBufferChange *next_change =
1400  &entry->txn->changes);
1401 
1402  elog(DEBUG2, "restored %u/%u changes from disk",
1403  (uint32) entry->txn->nentries_mem,
1404  (uint32) entry->txn->nentries);
1405 
1406  Assert(entry->txn->nentries_mem);
1407  /* txn stays the same */
1408  state->entries[off].lsn = next_change->lsn;
1409  state->entries[off].change = next_change;
1411 
1412  return change;
1413  }
1414  }
1415 
1416  /* ok, no changes there anymore, remove */
1418 
1419  return change;
1420 }
Datum binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:173
void binaryheap_replace_first(binaryheap *heap, Datum d)
Definition: binaryheap.c:207
Datum binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:158
static int32 next
Definition: blutils.c:219
unsigned int uint32
Definition: c.h:441
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:440
static bool dlist_has_next(dlist_head *head, dlist_node *node)
Definition: ilist.h:421
ReorderBufferChange * change
ReorderBufferTXN * txn

References Assert(), binaryheap_first(), binaryheap_remove_first(), binaryheap_replace_first(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, DatumGetInt32, DEBUG2, dlist_container, dlist_delete(), dlist_has_next(), dlist_head_element, dlist_is_empty(), dlist_next_node(), dlist_pop_head_node(), dlist_push_tail(), elog, ReorderBufferIterTXNEntry::file, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, next, ReorderBufferChange::node, ReorderBufferRestoreChanges(), ReorderBufferReturnChange(), ReorderBufferTXN::size, ReorderBuffer::totalBytes, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferLargestTopTXN()

static ReorderBufferTXN* ReorderBufferLargestTopTXN ( ReorderBuffer rb)
static

Definition at line 3418 of file reorderbuffer.c.

3419 {
3420  dlist_iter iter;
3421  Size largest_size = 0;
3422  ReorderBufferTXN *largest = NULL;
3423 
3424  /* Find the largest top-level transaction having a base snapshot. */
3426  {
3427  ReorderBufferTXN *txn;
3428 
3429  txn = dlist_container(ReorderBufferTXN, base_snapshot_node, iter.cur);
3430 
3431  /* must not be a subtxn */
3433  /* base_snapshot must be set */
3434  Assert(txn->base_snapshot != NULL);
3435 
3436  if ((largest == NULL || txn->total_size > largest_size) &&
3437  (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)))
3438  {
3439  largest = txn;
3440  largest_size = txn->total_size;
3441  }
3442  }
3443 
3444  return largest;
3445 }
#define rbtxn_has_partial_change(txn)

References Assert(), ReorderBufferTXN::base_snapshot, dlist_iter::cur, dlist_container, dlist_foreach, rbtxn_has_partial_change, rbtxn_is_known_subxact, ReorderBufferTXN::total_size, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferCheckMemoryLimit().

◆ ReorderBufferLargestTXN()

static ReorderBufferTXN* ReorderBufferLargestTXN ( ReorderBuffer rb)
static

Definition at line 3371 of file reorderbuffer.c.

3372 {
3373  HASH_SEQ_STATUS hash_seq;
3375  ReorderBufferTXN *largest = NULL;
3376 
3377  hash_seq_init(&hash_seq, rb->by_txn);
3378  while ((ent = hash_seq_search(&hash_seq)) != NULL)
3379  {
3380  ReorderBufferTXN *txn = ent->txn;
3381 
3382  /* if the current transaction is larger, remember it */
3383  if ((!largest) || (txn->size > largest->size))
3384  largest = txn;
3385  }
3386 
3387  Assert(largest);
3388  Assert(largest->size > 0);
3389  Assert(largest->size <= rb->size);
3390 
3391  return largest;
3392 }
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
ReorderBufferTXN * txn

References Assert(), ReorderBuffer::by_txn, hash_seq_init(), hash_seq_search(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBufferTXNByIdEnt::txn.

Referenced by ReorderBufferCheckMemoryLimit().

◆ ReorderBufferPrepare()

void ReorderBufferPrepare ( ReorderBuffer rb,
TransactionId  xid,
char *  gid 
)

Definition at line 2707 of file reorderbuffer.c.

2709 {
2710  ReorderBufferTXN *txn;
2711 
2712  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2713  false);
2714 
2715  /* unknown transaction, nothing to replay */
2716  if (txn == NULL)
2717  return;
2718 
2719  txn->txn_flags |= RBTXN_PREPARE;
2720  txn->gid = pstrdup(gid);
2721 
2722  /* The prepare info must have been updated in txn by now. */
2724 
2725  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2726  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2727 
2728  /*
2729  * We send the prepare for the concurrently aborted xacts so that later
2730  * when rollback prepared is decoded and sent, the downstream should be
2731  * able to rollback such a xact. See comments atop DecodePrepare.
2732  *
2733  * Note, for the concurrent_abort + streaming case a stream_prepare was
2734  * already sent within the ReorderBufferReplay call above.
2735  */
2736  if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
2737  rb->prepare(rb, txn, txn->final_lsn);
2738 }
ReorderBufferPrepareCB prepare

References Assert(), ReorderBufferTXN::concurrent_abort, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBuffer::prepare, ReorderBufferTXN::prepare_time, pstrdup(), rbtxn_is_streamed, RBTXN_PREPARE, ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodePrepare().

◆ ReorderBufferProcessPartialChange()

static void ReorderBufferProcessPartialChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  toast_insert 
)
static

Definition at line 702 of file reorderbuffer.c.

705 {
706  ReorderBufferTXN *toptxn;
707 
708  /*
709  * The partial changes need to be processed only while streaming
710  * in-progress transactions.
711  */
712  if (!ReorderBufferCanStream(rb))
713  return;
714 
715  /* Get the top transaction. */
716  if (txn->toptxn != NULL)
717  toptxn = txn->toptxn;
718  else
719  toptxn = txn;
720 
721  /*
722  * Indicate a partial change for toast inserts. The change will be
723  * considered as complete once we get the insert or update on the main
724  * table and we are sure that the pending toast chunks are not required
725  * anymore.
726  *
727  * If we allow streaming when there are pending toast chunks then such
728  * chunks won't be released till the insert (multi_insert) is complete and
729  * we expect the txn to have streamed all changes after streaming. This
730  * restriction is mainly to ensure the correctness of streamed
731  * transactions and it doesn't seem worth uplifting such a restriction
732  * just to allow this case because anyway we will stream the transaction
733  * once such an insert is complete.
734  */
735  if (toast_insert)
737  else if (rbtxn_has_partial_change(toptxn) &&
738  IsInsertOrUpdate(change->action) &&
739  change->data.tp.clear_toast_afterwards)
741 
742  /*
743  * Indicate a partial change for speculative inserts. The change will be
744  * considered as complete once we get the speculative confirm or abort
745  * token.
746  */
747  if (IsSpecInsert(change->action))
749  else if (rbtxn_has_partial_change(toptxn) &&
750  IsSpecConfirmOrAbort(change->action))
752 
753  /*
754  * Stream the transaction if it is serialized before and the changes are
755  * now complete in the top-level transaction.
756  *
757  * The reason for doing the streaming of such a transaction as soon as we
758  * get the complete change for it is that previously it would have reached
759  * the memory threshold and wouldn't get streamed because of incomplete
760  * changes. Delaying such transactions would increase apply lag for them.
761  */
763  !(rbtxn_has_partial_change(toptxn)) &&
764  rbtxn_is_serialized(txn))
765  ReorderBufferStreamTXN(rb, toptxn);
766 }
#define IsSpecInsert(action)
#define IsInsertOrUpdate(action)
#define IsSpecConfirmOrAbort(action)
#define RBTXN_HAS_PARTIAL_CHANGE

References ReorderBufferChange::action, ReorderBufferChange::data, IsInsertOrUpdate, IsSpecConfirmOrAbort, IsSpecInsert, RBTXN_HAS_PARTIAL_CHANGE, rbtxn_has_partial_change, rbtxn_is_serialized, ReorderBufferCanStartStreaming(), ReorderBufferCanStream(), ReorderBufferStreamTXN(), ReorderBufferTXN::toptxn, ReorderBufferChange::tp, and ReorderBufferTXN::txn_flags.

Referenced by ReorderBufferQueueChange().

◆ ReorderBufferProcessTXN()

static void ReorderBufferProcessTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn,
volatile Snapshot  snapshot_now,
volatile CommandId  command_id,
bool  streaming 
)
static

Definition at line 2029 of file reorderbuffer.c.

2034 {
2035  bool using_subtxn;
2037  ReorderBufferIterTXNState *volatile iterstate = NULL;
2038  volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
2039  ReorderBufferChange *volatile specinsert = NULL;
2040  volatile bool stream_started = false;
2041  ReorderBufferTXN *volatile curtxn = NULL;
2042 
2043  /* build data to be able to lookup the CommandIds of catalog tuples */
2045 
2046  /* setup the initial snapshot */
2047  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2048 
2049  /*
2050  * Decoding needs access to syscaches et al., which in turn use
2051  * heavyweight locks and such. Thus we need to have enough state around to
2052  * keep track of those. The easiest way is to simply use a transaction
2053  * internally. That also allows us to easily enforce that nothing writes
2054  * to the database by checking for xid assignments.
2055  *
2056  * When we're called via the SQL SRF there's already a transaction
2057  * started, so start an explicit subtransaction there.
2058  */
2059  using_subtxn = IsTransactionOrTransactionBlock();
2060 
2061  PG_TRY();
2062  {
2063  ReorderBufferChange *change;
2064 
2065  if (using_subtxn)
2066  BeginInternalSubTransaction(streaming ? "stream" : "replay");
2067  else
2069 
2070  /*
2071  * We only need to send begin/begin-prepare for non-streamed
2072  * transactions.
2073  */
2074  if (!streaming)
2075  {
2076  if (rbtxn_prepared(txn))
2077  rb->begin_prepare(rb, txn);
2078  else
2079  rb->begin(rb, txn);
2080  }
2081 
2082  ReorderBufferIterTXNInit(rb, txn, &iterstate);
2083  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
2084  {
2085  Relation relation = NULL;
2086  Oid reloid;
2087 
2088  /*
2089  * We can't call start stream callback before processing first
2090  * change.
2091  */
2092  if (prev_lsn == InvalidXLogRecPtr)
2093  {
2094  if (streaming)
2095  {
2096  txn->origin_id = change->origin_id;
2097  rb->stream_start(rb, txn, change->lsn);
2098  stream_started = true;
2099  }
2100  }
2101 
2102  /*
2103  * Enforce correct ordering of changes, merged from multiple
2104  * subtransactions. The changes may have the same LSN due to
2105  * MULTI_INSERT xlog records.
2106  */
2107  Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
2108 
2109  prev_lsn = change->lsn;
2110 
2111  /*
2112  * Set the current xid to detect concurrent aborts. This is
2113  * required for the cases when we decode the changes before the
2114  * COMMIT record is processed.
2115  */
2116  if (streaming || rbtxn_prepared(change->txn))
2117  {
2118  curtxn = change->txn;
2119  SetupCheckXidLive(curtxn->xid);
2120  }
2121 
2122  switch (change->action)
2123  {
2125 
2126  /*
2127  * Confirmation for speculative insertion arrived. Simply
2128  * use as a normal record. It'll be cleaned up at the end
2129  * of INSERT processing.
2130  */
2131  if (specinsert == NULL)
2132  elog(ERROR, "invalid ordering of speculative insertion changes");
2133  Assert(specinsert->data.tp.oldtuple == NULL);
2134  change = specinsert;
2136 
2137  /* intentionally fall through */
2141  Assert(snapshot_now);
2142 
2143  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
2144  change->data.tp.relnode.relNode);
2145 
2146  /*
2147  * Mapped catalog tuple without data, emitted while
2148  * catalog table was in the process of being rewritten. We
2149  * can fail to look up the relfilenode, because the
2150  * relmapper has no "historic" view, in contrast to the
2151  * normal catalog during decoding. Thus repeated rewrites
2152  * can cause a lookup failure. That's OK because we do not
2153  * decode catalog changes anyway. Normally such tuples
2154  * would be skipped over below, but we can't identify
2155  * whether the table should be logically logged without
2156  * mapping the relfilenode to the oid.
2157  */
2158  if (reloid == InvalidOid &&
2159  change->data.tp.newtuple == NULL &&
2160  change->data.tp.oldtuple == NULL)
2161  goto change_done;
2162  else if (reloid == InvalidOid)
2163  elog(ERROR, "could not map filenode \"%s\" to relation OID",
2164  relpathperm(change->data.tp.relnode,
2165  MAIN_FORKNUM));
2166 
2167  relation = RelationIdGetRelation(reloid);
2168 
2169  if (!RelationIsValid(relation))
2170  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
2171  reloid,
2172  relpathperm(change->data.tp.relnode,
2173  MAIN_FORKNUM));
2174 
2175  if (!RelationIsLogicallyLogged(relation))
2176  goto change_done;
2177 
2178  /*
2179  * Ignore temporary heaps created during DDL unless the
2180  * plugin has asked for them.
2181  */
2182  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2183  goto change_done;
2184 
2185  /*
2186  * For now ignore sequence changes entirely. Most of the
2187  * time they don't log changes using records we
2188  * understand, so it doesn't make sense to handle the few
2189  * cases we do.
2190  */
2191  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2192  goto change_done;
2193 
2194  /* user-triggered change */
2195  if (!IsToastRelation(relation))
2196  {
2197  ReorderBufferToastReplace(rb, txn, relation, change);
2198  ReorderBufferApplyChange(rb, txn, relation, change,
2199  streaming);
2200 
2201  /*
2202  * Only clear reassembled toast chunks if we're sure
2203  * they're not required anymore. The creator of the
2204  * tuple tells us.
2205  */
2206  if (change->data.tp.clear_toast_afterwards)
2207  ReorderBufferToastReset(rb, txn);
2208  }
2209  /* we're not interested in toast deletions */
2210  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2211  {
2212  /*
2213  * Need to reassemble the full toasted Datum in
2214  * memory, to ensure the chunks don't get reused till
2215  * we're done remove it from the list of this
2216  * transaction's changes. Otherwise it will get
2217  * freed/reused while restoring spooled data from
2218  * disk.
2219  */
2220  Assert(change->data.tp.newtuple != NULL);
2221 
2222  dlist_delete(&change->node);
2223  ReorderBufferToastAppendChunk(rb, txn, relation,
2224  change);
2225  }
2226 
2227  change_done:
2228 
2229  /*
2230  * If speculative insertion was confirmed, the record
2231  * isn't needed anymore.
2232  */
2233  if (specinsert != NULL)
2234  {
2235  ReorderBufferReturnChange(rb, specinsert, true);
2236  specinsert = NULL;
2237  }
2238 
2239  if (RelationIsValid(relation))
2240  {
2241  RelationClose(relation);
2242  relation = NULL;
2243  }
2244  break;
2245 
2247 
2248  /*
2249  * Speculative insertions are dealt with by delaying the
2250  * processing of the insert until the confirmation record
2251  * arrives. For that we simply unlink the record from the
2252  * chain, so it does not get freed/reused while restoring
2253  * spooled data from disk.
2254  *
2255  * This is safe in the face of concurrent catalog changes
2256  * because the relevant relation can't be changed between
2257  * speculative insertion and confirmation due to
2258  * CheckTableNotInUse() and locking.
2259  */
2260 
2261  /* clear out a pending (and thus failed) speculation */
2262  if (specinsert != NULL)
2263  {
2264  ReorderBufferReturnChange(rb, specinsert, true);
2265  specinsert = NULL;
2266  }
2267 
2268  /* and memorize the pending insertion */
2269  dlist_delete(&change->node);
2270  specinsert = change;
2271  break;
2272 
2274 
2275  /*
2276  * Abort for speculative insertion arrived. So cleanup the
2277  * specinsert tuple and toast hash.
2278  *
2279  * Note that we get the spec abort change for each toast
2280  * entry but we need to perform the cleanup only the first
2281  * time we get it for the main table.
2282  */
2283  if (specinsert != NULL)
2284  {
2285  /*
2286  * We must clean the toast hash before processing a
2287  * completely new tuple to avoid confusion about the
2288  * previous tuple's toast chunks.
2289  */
2290  Assert(change->data.tp.clear_toast_afterwards);
2291  ReorderBufferToastReset(rb, txn);
2292 
2293  /* We don't need this record anymore. */
2294  ReorderBufferReturnChange(rb, specinsert, true);
2295  specinsert = NULL;
2296  }
2297  break;
2298 
2300  {
2301  int i;
2302  int nrelids = change->data.truncate.nrelids;
2303  int nrelations = 0;
2304  Relation *relations;
2305 
2306  relations = palloc0(nrelids * sizeof(Relation));
2307  for (i = 0; i < nrelids; i++)
2308  {
2309  Oid relid = change->data.truncate.relids[i];
2310  Relation relation;
2311 
2312  relation = RelationIdGetRelation(relid);
2313 
2314  if (!RelationIsValid(relation))
2315  elog(ERROR, "could not open relation with OID %u", relid);
2316 
2317  if (!RelationIsLogicallyLogged(relation))
2318  continue;
2319 
2320  relations[nrelations++] = relation;
2321  }
2322 
2323  /* Apply the truncate. */
2324  ReorderBufferApplyTruncate(rb, txn, nrelations,
2325  relations, change,
2326  streaming);
2327 
2328  for (i = 0; i < nrelations; i++)
2329  RelationClose(relations[i]);
2330 
2331  break;
2332  }
2333 
2335  ReorderBufferApplyMessage(rb, txn, change, streaming);
2336  break;
2337 
2339  /* Execute the invalidation messages locally */
2340  ReorderBufferExecuteInvalidations(change->data.inval.ninvalidations,
2341  change->data.inval.invalidations);
2342  break;
2343 
2345  /* get rid of the old */
2346  TeardownHistoricSnapshot(false);
2347 
2348  if (snapshot_now->copied)
2349  {
2350  ReorderBufferFreeSnap(rb, snapshot_now);
2351  snapshot_now =
2352  ReorderBufferCopySnap(rb, change->data.snapshot,
2353  txn, command_id);
2354  }
2355 
2356  /*
2357  * Restored from disk, need to be careful not to double
2358  * free. We could introduce refcounting for that, but for
2359  * now this seems infrequent enough not to care.
2360  */
2361  else if (change->data.snapshot->copied)
2362  {
2363  snapshot_now =
2364  ReorderBufferCopySnap(rb, change->data.snapshot,
2365  txn, command_id);
2366  }
2367  else
2368  {
2369  snapshot_now = change->data.snapshot;
2370  }
2371 
2372  /* and continue with the new one */
2373  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2374  break;
2375 
2377  Assert(change->data.command_id != InvalidCommandId);
2378 
2379  if (command_id < change->data.command_id)
2380  {
2381  command_id = change->data.command_id;
2382 
2383  if (!snapshot_now->copied)
2384  {
2385  /* we don't use the global one anymore */
2386  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2387  txn, command_id);
2388  }
2389 
2390  snapshot_now->curcid = command_id;
2391 
2392  TeardownHistoricSnapshot(false);
2393  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2394  }
2395 
2396  break;
2397 
2399  elog(ERROR, "tuplecid value in changequeue");
2400  break;
2401  }
2402  }
2403 
2404  /* speculative insertion record must be freed by now */
2405  Assert(!specinsert);
2406 
2407  /* clean up the iterator */
2408  ReorderBufferIterTXNFinish(rb, iterstate);
2409  iterstate = NULL;
2410 
2411  /*
2412  * Update total transaction count and total bytes processed by the
2413  * transaction and its subtransactions. Ensure to not count the
2414  * streamed transaction multiple times.
2415  *
2416  * Note that the statistics computation has to be done after
2417  * ReorderBufferIterTXNFinish as it releases the serialized change
2418  * which we have already accounted in ReorderBufferIterTXNNext.
2419  */
2420  if (!rbtxn_is_streamed(txn))
2421  rb->totalTxns++;
2422 
2423  rb->totalBytes += txn->total_size;
2424 
2425  /*
2426  * Done with current changes, send the last message for this set of
2427  * changes depending upon streaming mode.
2428  */
2429  if (streaming)
2430  {
2431  if (stream_started)
2432  {
2433  rb->stream_stop(rb, txn, prev_lsn);
2434  stream_started = false;
2435  }
2436  }
2437  else
2438  {
2439  /*
2440  * Call either PREPARE (for two-phase transactions) or COMMIT (for
2441  * regular ones).
2442  */
2443  if (rbtxn_prepared(txn))
2444  rb->prepare(rb, txn, commit_lsn);
2445  else
2446  rb->commit(rb, txn, commit_lsn);
2447  }
2448 
2449  /* this is just a sanity check against bad output plugin behaviour */
2451  elog(ERROR, "output plugin used XID %u",
2453 
2454  /*
2455  * Remember the command ID and snapshot for the next set of changes in
2456  * streaming mode.
2457  */
2458  if (streaming)
2459  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2460  else if (snapshot_now->copied)
2461  ReorderBufferFreeSnap(rb, snapshot_now);
2462 
2463  /* cleanup */
2464  TeardownHistoricSnapshot(false);
2465 
2466  /*
2467  * Aborting the current (sub-)transaction as a whole has the right
2468  * semantics. We want all locks acquired in here to be released, not
2469  * reassigned to the parent and we do not want any database access
2470  * have persistent effects.
2471  */
2473 
2474  /* make sure there's no cache pollution */
2476 
2477  if (using_subtxn)
2479 
2480  /*
2481  * We are here due to one of the four reasons: 1. Decoding an
2482  * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
2483  * prepared txn that was (partially) streamed. 4. Decoding a committed
2484  * txn.
2485  *
2486  * For 1, we allow truncation of txn data by removing the changes
2487  * already streamed but still keeping other things like invalidations,
2488  * snapshot, and tuplecids. For 2 and 3, we indicate
2489  * ReorderBufferTruncateTXN to do more elaborate truncation of txn
2490  * data as the entire transaction has been decoded except for commit.
2491  * For 4, as the entire txn has been decoded, we can fully clean up
2492  * the TXN reorder buffer.
2493  */
2494  if (streaming || rbtxn_prepared(txn))
2495  {
2497  /* Reset the CheckXidAlive */
2499  }
2500  else
2501  ReorderBufferCleanupTXN(rb, txn);
2502  }
2503  PG_CATCH();
2504  {
2505  MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
2506  ErrorData *errdata = CopyErrorData();
2507 
2508  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
2509  if (iterstate)
2510  ReorderBufferIterTXNFinish(rb, iterstate);
2511 
2513 
2514  /*
2515  * Force cache invalidation to happen outside of a valid transaction
2516  * to prevent catalog access as we just caught an error.
2517  */
2519 
2520  /* make sure there's no cache pollution */
2522  txn->invalidations);
2523 
2524  if (using_subtxn)
2526 
2527  /*
2528  * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2529  * abort of the (sub)transaction we are streaming or preparing. We
2530  * need to do the cleanup and return gracefully on this error, see
2531  * SetupCheckXidLive.
2532  *
2533  * This error code can be thrown by one of the callbacks we call
2534  * during decoding so we need to ensure that we return gracefully only
2535  * when we are sending the data in streaming mode and the streaming is
2536  * not finished yet or when we are sending the data out on a PREPARE
2537  * during a two-phase commit.
2538  */
2539  if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2540  (stream_started || rbtxn_prepared(txn)))
2541  {
2542  /* curtxn must be set for streaming or prepared transactions */
2543  Assert(curtxn);
2544 
2545  /* Cleanup the temporary error state. */
2546  FlushErrorState();
2547  FreeErrorData(errdata);
2548  errdata = NULL;
2549  curtxn->concurrent_abort = true;
2550 
2551  /* Reset the TXN so that it is allowed to stream remaining data. */
2552  ReorderBufferResetTXN(rb, txn, snapshot_now,
2553  command_id, prev_lsn,
2554  specinsert);
2555  }
2556  else
2557  {
2558  ReorderBufferCleanupTXN(rb, txn);
2559  MemoryContextSwitchTo(ecxt);
2560  PG_RE_THROW();
2561  }
2562  }
2563  PG_END_TRY();
2564 }
bool IsToastRelation(Relation relation)
Definition: catalog.c:147
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1611
void FlushErrorState(void)
Definition: elog.c:1649
ErrorData * CopyErrorData(void)
Definition: elog.c:1555
#define PG_RE_THROW()
Definition: elog.h:340
#define PG_END_TRY()
Definition: elog.h:324
#define PG_TRY()
Definition: elog.h:299
#define PG_CATCH()
Definition: elog.h:309
void * palloc0(Size size)
Definition: mcxt.c:1099
const void * data
#define InvalidOid
Definition: postgres_ext.h:36
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:686
#define RelationIsValid(relation)
Definition: rel.h:462
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2053
void RelationClose(Relation relation)
Definition: relcache.c:2159
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
@ MAIN_FORKNUM
Definition: relpath.h:43
#define relpathperm(rnode, forknum)
Definition: relpath.h:83
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
static void ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
static void ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
static void SetupCheckXidLive(TransactionId xid)
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
static void ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
#define rbtxn_prepared(txn)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2094
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2078
int sqlerrcode
Definition: elog.h:367
Form_pg_class rd_rel
Definition: rel.h:109
RepOriginId origin_id
Definition: reorderbuffer.h:90
ReorderBufferBeginCB begin_prepare
ReorderBufferStreamStopCB stream_stop
ReorderBufferCommitCB commit
ReorderBufferStreamStartCB stream_start
ReorderBufferBeginCB begin
TransactionId CheckXidAlive
Definition: xact.c:98
void StartTransactionCommand(void)
Definition: xact.c:2925
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:458
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:441

References AbortCurrentTransaction(), ReorderBufferChange::action, Assert(), ReorderBuffer::begin, ReorderBuffer::begin_prepare, BeginInternalSubTransaction(), CheckXidAlive, ReorderBufferChange::command_id, ReorderBuffer::commit, ReorderBufferTXN::concurrent_abort, SnapshotData::copied, CopyErrorData(), SnapshotData::curcid, CurrentMemoryContext, ReorderBufferChange::data, data, dlist_delete(), elog, ERROR, FlushErrorState(), FreeErrorData(), GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), i, ReorderBufferChange::inval, ReorderBufferTXN::invalidations, InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, ReorderBufferChange::node, ReorderBufferChange::origin_id, ReorderBufferTXN::origin_id, ReorderBuffer::output_rewrites, palloc0(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, ReorderBuffer::prepare, rbtxn_is_streamed, rbtxn_prepared, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelationIsValid, RelidByRelfilenode(), relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferApplyChange(), ReorderBufferApplyMessage(), ReorderBufferApplyTruncate(), ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferResetTXN(), ReorderBufferReturnChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), RollbackAndReleaseCurrentSubTransaction(), SetupCheckXidLive(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, ErrorData::sqlerrcode, StartTransactionCommand(), ReorderBuffer::stream_start, ReorderBuffer::stream_stop, TeardownHistoricSnapshot(), ReorderBufferTXN::total_size, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTXN::tuplecid_hash, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferReplay(), and ReorderBufferStreamTXN().

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3023 of file reorderbuffer.c.

3024 {
3025  /* many records won't have an xid assigned, centralize check here */
3026  if (xid != InvalidTransactionId)
3027  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3028 }

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by heap2_decode(), heap_decode(), LogicalDecodingProcessRecord(), logicalmsg_decode(), standby_decode(), xact_decode(), and xlog_decode().

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change,
bool  toast_insert 
)

Definition at line 773 of file reorderbuffer.c.

775 {
776  ReorderBufferTXN *txn;
777 
778  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
779 
780  /*
781  * While streaming the previous changes we have detected that the
782  * transaction is aborted. So there is no point in collecting further
783  * changes for it.
784  */
785  if (txn->concurrent_abort)
786  {
787  /*
788  * We don't need to update memory accounting for this change as we
789  * have not added it to the queue yet.
790  */
791  ReorderBufferReturnChange(rb, change, false);
792  return;
793  }
794 
795  change->lsn = lsn;
796  change->txn = txn;
797 
798  Assert(InvalidXLogRecPtr != lsn);
799  dlist_push_tail(&txn->changes, &change->node);
800  txn->nentries++;
801  txn->nentries_mem++;
802 
803  /* update memory accounting information */
804  ReorderBufferChangeMemoryUpdate(rb, change, true,
805  ReorderBufferChangeSize(change));
806 
807  /* process partial change */
808  ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
809 
810  /* check the memory limits and evict something if needed */
812 }
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition, Size sz)

References Assert(), ReorderBufferTXN::changes, ReorderBufferTXN::concurrent_abort, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferCheckMemoryLimit(), ReorderBufferProcessPartialChange(), ReorderBufferReturnChange(), ReorderBufferTXNByXid(), and ReorderBufferChange::txn.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddInvalidations(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snapshot,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 819 of file reorderbuffer.c.

823 {
824  if (transactional)
825  {
826  MemoryContext oldcontext;
827  ReorderBufferChange *change;
828 
830 
831  oldcontext = MemoryContextSwitchTo(rb->context);
832 
833  change = ReorderBufferGetChange(rb);
835  change->data.msg.prefix = pstrdup(prefix);
836  change->data.msg.message_size = message_size;
837  change->data.msg.message = palloc(message_size);
838  memcpy(change->data.msg.message, message, message_size);
839 
840  ReorderBufferQueueChange(rb, xid, lsn, change, false);
841 
842  MemoryContextSwitchTo(oldcontext);
843  }
844  else
845  {
846  ReorderBufferTXN *txn = NULL;
847  volatile Snapshot snapshot_now = snapshot;
848 
849  if (xid != InvalidTransactionId)
850  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
851 
852  /* setup snapshot to allow catalog access */
853  SetupHistoricSnapshot(snapshot_now, NULL);
854  PG_TRY();
855  {
856  rb->message(rb, txn, lsn, false, prefix, message_size, message);
857 
859  }
860  PG_CATCH();
861  {
863  PG_RE_THROW();
864  }
865  PG_END_TRY();
866  }
867 }

References ReorderBufferChange::action, Assert(), ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), and TeardownHistoricSnapshot().

Referenced by logicalmsg_decode().

◆ ReorderBufferRememberPrepareInfo()

bool ReorderBufferRememberPrepareInfo ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  prepare_lsn,
XLogRecPtr  end_lsn,
TimestampTz  prepare_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2660 of file reorderbuffer.c.

2664 {
2665  ReorderBufferTXN *txn;
2666 
2667  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2668 
2669  /* unknown transaction, nothing to do */
2670  if (txn == NULL)
2671  return false;
2672 
2673  /*
2674  * Remember the prepare information to be later used by commit prepared in
2675  * case we skip doing prepare.
2676  */
2677  txn->final_lsn = prepare_lsn;
2678  txn->end_lsn = end_lsn;
2679  txn->xact_time.prepare_time = prepare_time;
2680  txn->origin_id = origin_id;
2681  txn->origin_lsn = origin_lsn;
2682 
2683  return true;
2684 }

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, ReorderBufferTXNByXid(), and ReorderBufferTXN::xact_time.

Referenced by DecodePrepare().

◆ ReorderBufferReplay()

static void ReorderBufferReplay ( ReorderBufferTXN txn,
ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)
static

Definition at line 2577 of file reorderbuffer.c.

2582 {
2583  Snapshot snapshot_now;
2584  CommandId command_id = FirstCommandId;
2585 
2586  txn->final_lsn = commit_lsn;
2587  txn->end_lsn = end_lsn;
2588  txn->xact_time.commit_time = commit_time;
2589  txn->origin_id = origin_id;
2590  txn->origin_lsn = origin_lsn;
2591 
2592  /*
2593  * If the transaction was (partially) streamed, we need to commit it in a
2594  * 'streamed' way. That is, we first stream the remaining part of the
2595  * transaction, and then invoke stream_commit message.
2596  *
2597  * Called after everything (origin ID, LSN, ...) is stored in the
2598  * transaction to avoid passing that information directly.
2599  */
2600  if (rbtxn_is_streamed(txn))
2601  {
2602  ReorderBufferStreamCommit(rb, txn);
2603  return;
2604  }
2605 
2606  /*
2607  * If this transaction has no snapshot, it didn't make any changes to the
2608  * database, so there's nothing to decode. Note that
2609  * ReorderBufferCommitChild will have transferred any snapshots from
2610  * subtransactions if there were any.
2611  */
2612  if (txn->base_snapshot == NULL)
2613  {
2614  Assert(txn->ninvalidations == 0);
2615 
2616  /*
2617  * Removing this txn before a commit might result in the computation
2618  * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
2619  */
2620  if (!rbtxn_prepared(txn))
2621  ReorderBufferCleanupTXN(rb, txn);
2622  return;
2623  }
2624 
2625  snapshot_now = txn->base_snapshot;
2626 
2627  /* Process and send the changes to output plugin. */
2628  ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2629  command_id, false);
2630 }
#define FirstCommandId
Definition: c.h:603
uint32 CommandId
Definition: c.h:601
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, FirstCommandId, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, rbtxn_is_streamed, rbtxn_prepared, ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), ReorderBufferStreamCommit(), and ReorderBufferTXN::xact_time.

Referenced by ReorderBufferCommit(), ReorderBufferFinishPrepared(), and ReorderBufferPrepare().

◆ ReorderBufferResetTXN()

static void ReorderBufferResetTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id,
XLogRecPtr  last_lsn,
ReorderBufferChange specinsert 
)
static

Definition at line 1986 of file reorderbuffer.c.

1991 {
1992  /* Discard the changes that we just streamed */
1994 
1995  /* Free all resources allocated for toast reconstruction */
1996  ReorderBufferToastReset(rb, txn);
1997 
1998  /* Return the spec insert change if it is not NULL */
1999  if (specinsert != NULL)
2000  {
2001  ReorderBufferReturnChange(rb, specinsert, true);
2002  specinsert = NULL;
2003  }
2004 
2005  /*
2006  * For the streaming case, stop the stream and remember the command ID and
2007  * snapshot for the streaming run.
2008  */
2009  if (rbtxn_is_streamed(txn))
2010  {
2011  rb->stream_stop(rb, txn, last_lsn);
2012  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2013  }
2014 }

References rbtxn_is_streamed, rbtxn_prepared, ReorderBufferReturnChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), and ReorderBuffer::stream_stop.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferRestoreChange()

static void ReorderBufferRestoreChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
char *  change 
)
static

Definition at line 4208 of file reorderbuffer.c.

4210 {
4211  ReorderBufferDiskChange *ondisk;
4212  ReorderBufferChange *change;
4213 
4214  ondisk = (ReorderBufferDiskChange *) data;
4215 
4216  change = ReorderBufferGetChange(rb);
4217 
4218  /* copy static part */
4219  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
4220 
4221  data += sizeof(ReorderBufferDiskChange);
4222 
4223  /* restore individual stuff */
4224  switch (change->action)
4225  {
4226  /* fall through these, they're all similar enough */
4231  if (change->data.tp.oldtuple)
4232  {
4233  uint32 tuplelen = ((HeapTuple) data)->t_len;
4234 
4235  change->data.tp.oldtuple =
4237 
4238  /* restore ->tuple */
4239  memcpy(&change->data.tp.oldtuple->tuple, data,
4240  sizeof(HeapTupleData));
4241  data += sizeof(HeapTupleData);
4242 
4243  /* reset t_data pointer into the new tuplebuf */
4244  change->data.tp.oldtuple->tuple.t_data =
4245  ReorderBufferTupleBufData(change->data.tp.oldtuple);
4246 
4247  /* restore tuple data itself */
4248  memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
4249  data += tuplelen;
4250  }
4251 
4252  if (change->data.tp.newtuple)
4253  {
4254  /* here, data might not be suitably aligned! */
4255  uint32 tuplelen;
4256 
4257  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
4258  sizeof(uint32));
4259 
4260  change->data.tp.newtuple =
4262 
4263  /* restore ->tuple */
4264  memcpy(&change->data.tp.newtuple->tuple, data,
4265  sizeof(HeapTupleData));
4266  data += sizeof(HeapTupleData);
4267 
4268  /* reset t_data pointer into the new tuplebuf */
4269  change->data.tp.newtuple->tuple.t_data =
4270  ReorderBufferTupleBufData(change->data.tp.newtuple);
4271 
4272  /* restore tuple data itself */
4273  memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
4274  data += tuplelen;
4275  }
4276 
4277  break;
4279  {
4280  Size prefix_size;
4281 
4282  /* read prefix */
4283  memcpy(&prefix_size, data, sizeof(Size));
4284  data += sizeof(Size);
4285  change->data.msg.prefix = MemoryContextAlloc(rb->context,
4286  prefix_size);
4287  memcpy(change->data.msg.prefix, data, prefix_size);
4288  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
4289  data += prefix_size;
4290 
4291  /* read the message */
4292  memcpy(&change->data.msg.message_size, data, sizeof(Size));
4293  data += sizeof(Size);
4294  change->data.msg.message = MemoryContextAlloc(rb->context,
4295  change->data.msg.message_size);
4296  memcpy(change->data.msg.message, data,
4297  change->data.msg.message_size);
4298  data += change->data.msg.message_size;
4299 
4300  break;
4301  }
4303  {
4304  Size inval_size = sizeof(SharedInvalidationMessage) *
4305  change->data.inval.ninvalidations;
4306 
4307  change->data.inval.invalidations =
4308  MemoryContextAlloc(rb->context, inval_size);
4309 
4310  /* read the message */
4311  memcpy(change->data.inval.invalidations, data, inval_size);
4312 
4313  break;
4314  }
4316  {
4317  Snapshot oldsnap;
4318  Snapshot newsnap;
4319  Size size;
4320 
4321  oldsnap = (Snapshot) data;
4322 
4323  size = sizeof(SnapshotData) +
4324  sizeof(TransactionId) * oldsnap->xcnt +
4325  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
4326 
4327  change->data.snapshot = MemoryContextAllocZero(rb->context, size);
4328 
4329  newsnap = change->data.snapshot;
4330 
4331  memcpy(newsnap, data, size);
4332  newsnap->xip = (TransactionId *)
4333  (((char *) newsnap) + sizeof(SnapshotData));
4334  newsnap->subxip = newsnap->xip + newsnap->xcnt;
4335  newsnap->copied = true;
4336  break;
4337  }
4338  /* the base struct contains all the data, easy peasy */
4340  {
4341  Oid *relids;
4342 
4343  relids = ReorderBufferGetRelids(rb,
4344  change->data.truncate.nrelids);
4345  memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
4346  change->data.truncate.relids = relids;
4347 
4348  break;
4349  }
4354  break;
4355  }
4356 
4357  dlist_push_tail(&txn->changes, &change->node);
4358  txn->nentries_mem++;
4359 
4360  /*
4361  * Update memory accounting for the restored change. We need to do this
4362  * although we don't check the memory limit when restoring the changes in
4363  * this branch (we only do that when initially queueing the changes after
4364  * decoding), because we will release the changes later, and that will
4365  * update the accounting too (subtracting the size from the counters). And
4366  * we don't want to underflow there.
4367  */
4368  ReorderBufferChangeMemoryUpdate(rb, change, true,
4369  ReorderBufferChangeSize(change));
4370 }
#define offsetof(type, field)
Definition: c.h:727
HeapTupleData * HeapTuple
Definition: htup.h:71
struct ReorderBufferDiskChange ReorderBufferDiskChange
Oid * ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
ReorderBufferTupleBuf * ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
struct SnapshotData * Snapshot
Definition: snapshot.h:121
ReorderBufferChange change

References ReorderBufferChange::action, Assert(), ReorderBufferDiskChange::change, ReorderBufferTXN::changes, ReorderBuffer::context, SnapshotData::copied, ReorderBufferChange::data, data, dlist_push_tail(), ReorderBufferChange::inval, MemoryContextAlloc(), MemoryContextAllocZero(), ReorderBufferChange::msg, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, offsetof, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferGetChange(), ReorderBufferGetRelids(), ReorderBufferGetTupleBuf(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, ReorderBufferChange::tp, ReorderBufferChange::truncate, SnapshotData::xcnt, and SnapshotData::xip.

Referenced by ReorderBufferRestoreChanges().

◆ ReorderBufferRestoreChanges()

static Size ReorderBufferRestoreChanges ( ReorderBuffer rb,
ReorderBufferTXN txn,
TXNEntryFile file,
XLogSegNo segno 
)
static

Definition at line 4067 of file reorderbuffer.c.

4069 {
4070  Size restored = 0;
4071  XLogSegNo last_segno;
4072  dlist_mutable_iter cleanup_iter;
4073  File *fd = &file->vfd;
4074 
4077 
4078  /* free current entries, so we have memory for more */
4079  dlist_foreach_modify(cleanup_iter, &txn->changes)
4080  {
4082  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
4083 
4084  dlist_delete(&cleanup->node);
4086  }
4087  txn->nentries_mem = 0;
4088  Assert(dlist_is_empty(&txn->changes));
4089 
4090  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
4091 
4092  while (restored < max_changes_in_memory && *segno <= last_segno)
4093  {
4094  int readBytes;
4095  ReorderBufferDiskChange *ondisk;
4096 
4097  if (*fd == -1)
4098  {
4099  char path[MAXPGPATH];
4100 
4101  /* first time in */
4102  if (*segno == 0)
4103  XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
4104 
4105  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
4106 
4107  /*
4108  * No need to care about TLIs here, only used during a single run,
4109  * so each LSN only maps to a specific WAL record.
4110  */
4112  *segno);
4113 
4114  *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
4115 
4116  /* No harm in resetting the offset even in case of failure */
4117  file->curOffset = 0;
4118 
4119  if (*fd < 0 && errno == ENOENT)
4120  {
4121  *fd = -1;
4122  (*segno)++;
4123  continue;
4124  }
4125  else if (*fd < 0)
4126  ereport(ERROR,
4128  errmsg("could not open file \"%s\": %m",
4129  path)));
4130  }
4131 
4132  /*
4133  * Read the statically sized part of a change which has information
4134  * about the total size. If we couldn't read a record, we're at the
4135  * end of this file.
4136  */
4138  readBytes = FileRead(file->vfd, rb->outbuf,
4139  sizeof(ReorderBufferDiskChange),
4141 
4142  /* eof */
4143  if (readBytes == 0)
4144  {
4145  FileClose(*fd);
4146  *fd = -1;
4147  (*segno)++;
4148  continue;
4149  }
4150  else if (readBytes < 0)
4151  ereport(ERROR,
4153  errmsg("could not read from reorderbuffer spill file: %m")));
4154  else if (readBytes != sizeof(ReorderBufferDiskChange))
4155  ereport(ERROR,
4157  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4158  readBytes,
4159  (uint32) sizeof(ReorderBufferDiskChange))));
4160 
4161  file->curOffset += readBytes;
4162 
4163  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4164 
4166  sizeof(ReorderBufferDiskChange) + ondisk->size);
4167  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4168 
4169  readBytes = FileRead(file->vfd,
4170  rb->outbuf + sizeof(ReorderBufferDiskChange),
4171  ondisk->size - sizeof(ReorderBufferDiskChange),
4172  file->curOffset,
4174 
4175  if (readBytes < 0)
4176  ereport(ERROR,
4178  errmsg("could not read from reorderbuffer spill file: %m")));
4179  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
4180  ereport(ERROR,
4182  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4183  readBytes,
4184  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
4185 
4186  file->curOffset += readBytes;
4187 
4188  /*
4189  * ok, read a full change from disk, now restore it into proper
4190  * in-memory format
4191  */
4192  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
4193  restored++;
4194  }
4195 
4196  return restored;
4197 }
static void cleanup(void)
Definition: bootstrap.c:697
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1566
int FileRead(File file, char *buffer, int amount, off_t offset, uint32 wait_event_info)
Definition: fd.c:2112
int File
Definition: fd.h:54
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
static const Size max_changes_in_memory
@ WAIT_EVENT_REORDER_BUFFER_READ
Definition: wait_event.h:198
int wal_segment_size
Definition: xlog.c:144
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition: xlogdefs.h:48

References Assert(), ReorderBufferTXN::changes, cleanup(), dlist_mutable_iter::cur, TXNEntryFile::curOffset, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FileClose(), FileRead(), ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, max_changes_in_memory, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBuffer::outbuf, PathNameOpenFile(), PG_BINARY, ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferSerializedPath(), ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, TXNEntryFile::vfd, WAIT_EVENT_REORDER_BUFFER_READ, wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferIterTXNInit(), and ReorderBufferIterTXNNext().

◆ ReorderBufferRestoreCleanup()

static void ReorderBufferRestoreCleanup ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 4376 of file reorderbuffer.c.

4377 {
4378  XLogSegNo first;
4379  XLogSegNo cur;
4380  XLogSegNo last;
4381 
4384 
4385  XLByteToSeg(txn->first_lsn, first, wal_segment_size);
4386  XLByteToSeg(txn->final_lsn, last, wal_segment_size);
4387 
4388  /* iterate over all possible filenames, and delete them */
4389  for (cur = first; cur <= last; cur++)
4390  {
4391  char path[MAXPGPATH];
4392 
4394  if (unlink(path) != 0 && errno != ENOENT)
4395  ereport(ERROR,
4397  errmsg("could not remove file \"%s\": %m", path)));
4398  }
4399 }
struct cursor * cur
Definition: ecpg.c:28

References Assert(), cur, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, MAXPGPATH, MyReplicationSlot, ReorderBufferSerializedPath(), wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferCleanupTXN(), and ReorderBufferTruncateTXN().

◆ ReorderBufferReturnChange()

void ReorderBufferReturnChange ( ReorderBuffer rb,
ReorderBufferChange change,
bool  upd_mem 
)

Definition at line 480 of file reorderbuffer.c.

482 {
483  /* update memory accounting info */
484  if (upd_mem)
485  ReorderBufferChangeMemoryUpdate(rb, change, false,
486  ReorderBufferChangeSize(change));
487 
488  /* free contained data */
489  switch (change->action)
490  {
495  if (change->data.tp.newtuple)
496  {
497  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
498  change->data.tp.newtuple = NULL;
499  }
500 
501  if (change->data.tp.oldtuple)
502  {
503  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
504  change->data.tp.oldtuple = NULL;
505  }
506  break;
508  if (change->data.msg.prefix != NULL)
509  pfree(change->data.msg.prefix);
510  change->data.msg.prefix = NULL;
511  if (change->data.msg.message != NULL)
512  pfree(change->data.msg.message);
513  change->data.msg.message = NULL;
514  break;
516  if (change->data.inval.invalidations)
517  pfree(change->data.inval.invalidations);
518  change->data.inval.invalidations = NULL;
519  break;
521  if (change->data.snapshot)
522  {
523  ReorderBufferFreeSnap(rb, change->data.snapshot);
524  change->data.snapshot = NULL;
525  }
526  break;
527  /* no data in addition to the struct itself */
529  if (change->data.truncate.relids != NULL)
530  {
531  ReorderBufferReturnRelids(rb, change->data.truncate.relids);
532  change->data.truncate.relids = NULL;
533  }
534  break;
539  break;
540  }
541 
542  pfree(change);
543 }
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
void ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::msg, pfree(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferFreeSnap(), ReorderBufferReturnRelids(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, ReorderBufferChange::tp, and ReorderBufferChange::truncate.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferProcessTXN(), ReorderBufferQueueChange(), ReorderBufferResetTXN(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferToastReset(), and ReorderBufferTruncateTXN().

◆ ReorderBufferReturnRelids()

void ReorderBufferReturnRelids ( ReorderBuffer rb,
Oid relids 
)

Definition at line 602 of file reorderbuffer.c.

603 {
604  pfree(relids);
605 }

References pfree().

Referenced by ReorderBufferReturnChange().

◆ ReorderBufferReturnTupleBuf()

void ReorderBufferReturnTupleBuf ( ReorderBuffer rb,
ReorderBufferTupleBuf tuple 
)

Definition at line 571 of file reorderbuffer.c.

572 {
573  pfree(tuple);
574 }

References pfree().

Referenced by ReorderBufferReturnChange().

◆ ReorderBufferReturnTXN()

static void ReorderBufferReturnTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 426 of file reorderbuffer.c.

427 {
428  /* clean the lookup cache if we were cached (quite likely) */
429  if (rb->by_txn_last_xid == txn->xid)
430  {
432  rb->by_txn_last_txn = NULL;
433  }
434 
435  /* free data that's contained */
436 
437  if (txn->gid != NULL)
438  {
439  pfree(txn->gid);
440  txn->gid = NULL;
441  }
442 
443  if (txn->tuplecid_hash != NULL)
444  {
446  txn->tuplecid_hash = NULL;
447  }
448 
449  if (txn->invalidations)
450  {
451  pfree(txn->invalidations);
452  txn->invalidations = NULL;
453  }
454 
455  /* Reset the toast hash */
456  ReorderBufferToastReset(rb, txn);
457 
458  pfree(txn);
459 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:862

References ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBufferTXN::gid, hash_destroy(), ReorderBufferTXN::invalidations, InvalidTransactionId, pfree(), ReorderBufferToastReset(), ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::xid.

Referenced by ReorderBufferCleanupTXN().

◆ ReorderBufferSaveTXNSnapshot()

static void ReorderBufferSaveTXNSnapshot ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id 
)
inlinestatic

Definition at line 1965 of file reorderbuffer.c.

1967 {
1968  txn->command_id = command_id;
1969 
1970  /* Avoid copying if it's already copied. */
1971  if (snapshot_now->copied)
1972  txn->snapshot_now = snapshot_now;
1973  else
1974  txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1975  txn, command_id);
1976 }

References ReorderBufferTXN::command_id, SnapshotData::copied, ReorderBufferCopySnap(), and ReorderBufferTXN::snapshot_now.

Referenced by ReorderBufferProcessTXN(), and ReorderBufferResetTXN().

◆ ReorderBufferSerializeChange()

static void ReorderBufferSerializeChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  fd,
ReorderBufferChange change 
)
static

Definition at line 3615 of file reorderbuffer.c.

3617 {
3618  ReorderBufferDiskChange *ondisk;
3619  Size sz = sizeof(ReorderBufferDiskChange);
3620 
3622 
3623  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3624  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
3625 
3626  switch (change->action)
3627  {
3628  /* fall through these, they're all similar enough */
3633  {
3634  char *data;
3635  ReorderBufferTupleBuf *oldtup,
3636  *newtup;
3637  Size oldlen = 0;
3638  Size newlen = 0;
3639 
3640  oldtup = change->data.tp.oldtuple;
3641  newtup = change->data.tp.newtuple;
3642 
3643  if (oldtup)
3644  {
3645  sz += sizeof(HeapTupleData);
3646  oldlen = oldtup->tuple.t_len;
3647  sz += oldlen;
3648  }
3649 
3650  if (newtup)
3651  {
3652  sz += sizeof(HeapTupleData);
3653  newlen = newtup->tuple.t_len;
3654  sz += newlen;
3655  }
3656 
3657  /* make sure we have enough space */
3659 
3660  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3661  /* might have been reallocated above */
3662  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3663 
3664  if (oldlen)
3665  {
3666  memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
3667  data += sizeof(HeapTupleData);
3668 
3669  memcpy(data, oldtup->tuple.t_data, oldlen);
3670  data += oldlen;
3671  }
3672 
3673  if (newlen)
3674  {
3675  memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
3676  data += sizeof(HeapTupleData);
3677 
3678  memcpy(data, newtup->tuple.t_data, newlen);
3679  data += newlen;
3680  }
3681  break;
3682  }
3684  {
3685  char *data;
3686  Size prefix_size = strlen(change->data.msg.prefix) + 1;
3687 
3688  sz += prefix_size + change->data.msg.message_size +
3689  sizeof(Size) + sizeof(Size);
3691 
3692  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3693 
3694  /* might have been reallocated above */
3695  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3696 
3697  /* write the prefix including the size */
3698  memcpy(data, &prefix_size, sizeof(Size));
3699  data += sizeof(Size);
3700  memcpy(data, change->data.msg.prefix,
3701  prefix_size);
3702  data += prefix_size;
3703 
3704  /* write the message including the size */
3705  memcpy(data, &change->data.msg.message_size, sizeof(Size));
3706  data += sizeof(Size);
3707  memcpy(data, change->data.msg.message,
3708  change->data.msg.message_size);
3709  data += change->data.msg.message_size;
3710 
3711  break;
3712  }
3714  {
3715  char *data;
3716  Size inval_size = sizeof(SharedInvalidationMessage) *
3717  change->data.inval.ninvalidations;
3718 
3719  sz += inval_size;
3720 
3722  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3723 
3724  /* might have been reallocated above */
3725  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3726  memcpy(data, change->data.inval.invalidations, inval_size);
3727  data += inval_size;
3728 
3729  break;
3730  }
3732  {
3733  Snapshot snap;
3734  char *data;
3735 
3736  snap = change->data.snapshot;
3737 
3738  sz += sizeof(SnapshotData) +
3739  sizeof(TransactionId) * snap->xcnt +
3740  sizeof(TransactionId) * snap->subxcnt;
3741 
3742  /* make sure we have enough space */
3744  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3745  /* might have been reallocated above */
3746  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3747 
3748  memcpy(data, snap, sizeof(SnapshotData));
3749  data += sizeof(SnapshotData);
3750 
3751  if (snap->xcnt)
3752  {
3753  memcpy(data, snap->xip,
3754  sizeof(TransactionId) * snap->xcnt);
3755  data += sizeof(TransactionId) * snap->xcnt;
3756  }
3757 
3758  if (snap->subxcnt)
3759  {
3760  memcpy(data, snap->subxip,
3761  sizeof(TransactionId) * snap->subxcnt);
3762  data += sizeof(TransactionId) * snap->subxcnt;
3763  }
3764  break;
3765  }
3767  {
3768  Size size;
3769  char *data;
3770 
3771  /* account for the OIDs of truncated relations */
3772  size = sizeof(Oid) * change->data.truncate.nrelids;
3773  sz += size;
3774 
3775  /* make sure we have enough space */
3777 
3778  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3779  /* might have been reallocated above */
3780  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3781 
3782  memcpy(data, change->data.truncate.relids, size);
3783  data += size;
3784 
3785  break;
3786  }
3791  /* ReorderBufferChange contains everything important */
3792  break;
3793  }
3794 
3795  ondisk->size = sz;
3796 
3797  errno = 0;
3799  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
3800  {
3801  int save_errno = errno;
3802 
3804 
3805  /* if write didn't set errno, assume problem is no disk space */
3806  errno = save_errno ? save_errno : ENOSPC;
3807  ereport(ERROR,
3809  errmsg("could not write to data file for XID %u: %m",
3810  txn->xid)));
3811  }
3813 
3814  /*
3815  * Keep the transaction's final_lsn up to date with each change we send to
3816  * disk, so that ReorderBufferRestoreCleanup works correctly. (We used to
3817  * only do this on commit and abort records, but that doesn't work if a
3818  * system crash leaves a transaction without its abort record).
3819  *
3820  * Make sure not to move it backwards.
3821  */
3822  if (txn->final_lsn < change->lsn)
3823  txn->final_lsn = change->lsn;
3824 
3825  Assert(ondisk->change.action == change->action);
3826 }
#define write(a, b, c)
Definition: win32.h:14
@ WAIT_EVENT_REORDER_BUFFER_WRITE
Definition: wait_event.h:199

References ReorderBufferChange::action, Assert(), ReorderBufferDiskChange::change, CloseTransientFile(), ReorderBufferChange::data, data, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferTXN::final_lsn, if(), ReorderBufferChange::inval, ReorderBufferChange::lsn, ReorderBufferChange::msg, ReorderBuffer::outbuf, pgstat_report_wait_end(), pgstat_report_wait_start(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTupleBuf::tuple, WAIT_EVENT_REORDER_BUFFER_WRITE, write, SnapshotData::xcnt, ReorderBufferTXN::xid, and SnapshotData::xip.

Referenced by ReorderBufferSerializeTXN().

◆ ReorderBufferSerializedPath()

static void ReorderBufferSerializedPath ( char *  path,
ReplicationSlot slot,
TransactionId  xid,
XLogSegNo  segno 
)
static

Definition at line 4445 of file reorderbuffer.c.

4447 {
4448  XLogRecPtr recptr;
4449 
4450  XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
4451 
4452  snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.spill",
4454  xid, LSN_FORMAT_ARGS(recptr));
4455 }
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43

References ReplicationSlot::data, LSN_FORMAT_ARGS, MAXPGPATH, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, snprintf, wal_segment_size, and XLogSegNoOffsetToRecPtr.

Referenced by ReorderBufferRestoreChanges(), ReorderBufferRestoreCleanup(), and ReorderBufferSerializeTXN().

◆ ReorderBufferSerializeReserve()

static void ReorderBufferSerializeReserve ( ReorderBuffer rb,
Size  sz 
)
static

Definition at line 3345 of file reorderbuffer.c.

3346 {
3347  if (!rb->outbufsize)
3348  {
3349  rb->outbuf = MemoryContextAlloc(rb->context, sz);
3350  rb->outbufsize = sz;
3351  }
3352  else if (rb->outbufsize < sz)
3353  {
3354  rb->outbuf = repalloc(rb->outbuf, sz);
3355  rb->outbufsize = sz;
3356  }
3357 }

References ReorderBuffer::context, MemoryContextAlloc(), ReorderBuffer::outbuf, ReorderBuffer::outbufsize, and repalloc().

Referenced by ReorderBufferRestoreChanges(), and ReorderBufferSerializeChange().

◆ ReorderBufferSerializeTXN()

static void ReorderBufferSerializeTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 3523 of file reorderbuffer.c.

3524 {
3525  dlist_iter subtxn_i;
3526  dlist_mutable_iter change_i;
3527  int fd = -1;
3528  XLogSegNo curOpenSegNo = 0;
3529  Size spilled = 0;
3530  Size size = txn->size;
3531 
3532  elog(DEBUG2, "spill %u changes in XID %u to disk",
3533  (uint32) txn->nentries_mem, txn->xid);
3534 
3535  /* do the same to all child TXs */
3536  dlist_foreach(subtxn_i, &txn->subtxns)
3537  {
3538  ReorderBufferTXN *subtxn;
3539 
3540  subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
3541  ReorderBufferSerializeTXN(rb, subtxn);
3542  }
3543 
3544  /* serialize changestream */
3545  dlist_foreach_modify(change_i, &txn->changes)
3546  {
3547  ReorderBufferChange *change;
3548 
3549  change = dlist_container(ReorderBufferChange, node, change_i.cur);
3550 
3551  /*
3552  * store in segment in which it belongs by start lsn, don't split over
3553  * multiple segments tho
3554  */
3555  if (fd == -1 ||
3556  !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
3557  {
3558  char path[MAXPGPATH];
3559 
3560  if (fd != -1)
3562 
3563  XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
3564 
3565  /*
3566  * No need to care about TLIs here, only used during a single run,
3567  * so each LSN only maps to a specific WAL record.
3568  */
3570  curOpenSegNo);
3571 
3572  /* open segment, create it if necessary */
3573  fd = OpenTransientFile(path,
3574  O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
3575 
3576  if (fd < 0)
3577  ereport(ERROR,
3579  errmsg("could not open file \"%s\": %m", path)));
3580  }
3581 
3582  ReorderBufferSerializeChange(rb, txn, fd, change);
3583  dlist_delete(&change->node);
3584  ReorderBufferReturnChange(rb, change, true);
3585 
3586  spilled++;
3587  }
3588 
3589  /* update the statistics iff we have spilled anything */
3590  if (spilled)
3591  {
3592  rb->spillCount += 1;
3593  rb->spillBytes += size;
3594 
3595  /* don't consider already serialized transactions */
3596  rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
3597 
3598  /* update the decoding stats */
3600  }
3601 
3602  Assert(spilled == txn->nentries_mem);
3603  Assert(dlist_is_empty(&txn->changes));
3604  txn->nentries_mem = 0;
3606 
3607  if (fd != -1)
3609 }
void UpdateDecodingStats(LogicalDecodingContext *ctx)
Definition: logical.c:1833
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
#define rbtxn_is_serialized_clear(txn)
#define RBTXN_IS_SERIALIZED
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)

References Assert(), ReorderBufferTXN::changes, CloseTransientFile(), dlist_iter::cur, dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_delete(), dlist_foreach, dlist_foreach_modify, dlist_is_empty(), elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferChange::lsn, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, OpenTransientFile(), PG_BINARY, ReorderBuffer::private_data, RBTXN_IS_SERIALIZED, rbtxn_is_serialized, rbtxn_is_serialized_clear, ReorderBufferReturnChange(), ReorderBufferSerializeChange(), ReorderBufferSerializedPath(), ReorderBufferTXN::size, ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBufferTXN::subtxns, ReorderBufferTXN::txn_flags, UpdateDecodingStats(), wal_segment_size, ReorderBufferTXN::xid, XLByteInSeg, and XLByteToSeg.

Referenced by ReorderBufferCheckMemoryLimit(), and ReorderBufferIterTXNInit().

◆ ReorderBufferSetBaseSnapshot()

void ReorderBufferSetBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 3054 of file reorderbuffer.c.

3056 {
3057  ReorderBufferTXN *txn;
3058  bool is_new;
3059 
3060  AssertArg(snap != NULL);
3061 
3062  /*
3063  * Fetch the transaction to operate on. If we know it's a subtransaction,
3064  * operate on its top-level transaction instead.
3065  */
3066  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
3067  if (rbtxn_is_known_subxact(txn))
3068  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3069  NULL, InvalidXLogRecPtr, false);
3070  Assert(txn->base_snapshot == NULL);
3071 
3072  txn->base_snapshot = snap;
3073  txn->base_snapshot_lsn = lsn;
3075 
3076  AssertTXNLsnOrder(rb);
3077 }
#define AssertArg(condition)
Definition: c.h:806

References Assert(), AssertArg, AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBufferTXN::base_snapshot_node, dlist_push_tail(), InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), ReorderBufferTXN::toplevel_xid, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by SnapBuildCommitTxn(), and SnapBuildProcessChange().

◆ ReorderBufferSetRestartPoint()

void ReorderBufferSetRestartPoint ( ReorderBuffer rb,
XLogRecPtr  ptr 
)

Definition at line 1009 of file reorderbuffer.c.

1010 {
1011  rb->current_restart_decoding_lsn = ptr;
1012 }

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

◆ ReorderBufferSkipPrepare()

void ReorderBufferSkipPrepare ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 2688 of file reorderbuffer.c.

2689 {
2690  ReorderBufferTXN *txn;
2691 
2692  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2693 
2694  /* unknown transaction, nothing to do */
2695  if (txn == NULL)
2696  return;
2697 
2699 }
#define RBTXN_SKIPPED_PREPARE

References InvalidXLogRecPtr, RBTXN_SKIPPED_PREPARE, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by DecodePrepare().

◆ ReorderBufferStreamCommit()

static void ReorderBufferStreamCommit ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1830 of file reorderbuffer.c.

1831 {
1832  /* we should only call this for previously streamed transactions */
1833  Assert(rbtxn_is_streamed(txn));
1834 
1835  ReorderBufferStreamTXN(rb, txn);
1836 
1837  if (rbtxn_prepared(txn))
1838  {
1839  /*
1840  * Note, we send stream prepare even if a concurrent abort is
1841  * detected. See DecodePrepare for more information.
1842  */
1843  rb->stream_prepare(rb, txn, txn->final_lsn);
1844 
1845  /*
1846  * This is a PREPARED transaction, part of a two-phase commit. The
1847  * full cleanup will happen as part of the COMMIT PREPAREDs, so now
1848  * just truncate txn by removing changes and tuple_cids.
1849  */
1850  ReorderBufferTruncateTXN(rb, txn, true);
1851  /* Reset the CheckXidAlive */
1853  }
1854  else
1855  {
1856  rb->stream_commit(rb, txn, txn->final_lsn);
1857  ReorderBufferCleanupTXN(rb, txn);
1858  }
1859 }
ReorderBufferStreamPrepareCB stream_prepare
ReorderBufferStreamCommitCB stream_commit

References Assert(), CheckXidAlive, ReorderBufferTXN::final_lsn, InvalidTransactionId, rbtxn_is_streamed, rbtxn_prepared, ReorderBufferCleanupTXN(), ReorderBufferStreamTXN(), ReorderBufferTruncateTXN(), ReorderBuffer::stream_commit, and ReorderBuffer::stream_prepare.

Referenced by ReorderBufferReplay().

◆ ReorderBufferStreamTXN()

static void ReorderBufferStreamTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 3865 of file reorderbuffer.c.

3866 {
3867  Snapshot snapshot_now;
3868  CommandId command_id;
3869  Size stream_bytes;
3870  bool txn_is_streamed;
3871 
3872  /* We can never reach here for a subtransaction. */
3873  Assert(txn->toptxn == NULL);
3874 
3875  /*
3876  * We can't make any assumptions about base snapshot here, similar to what
3877  * ReorderBufferCommit() does. That relies on base_snapshot getting
3878  * transferred from subxact in ReorderBufferCommitChild(), but that was
3879  * not yet called as the transaction is in-progress.
3880  *
3881  * So just walk the subxacts and use the same logic here. But we only need
3882  * to do that once, when the transaction is streamed for the first time.
3883  * After that we need to reuse the snapshot from the previous run.
3884  *
3885  * Unlike DecodeCommit which adds xids of all the subtransactions in
3886  * snapshot's xip array via SnapBuildCommittedTxn, we can't do that here
3887  * but we do add them to subxip array instead via ReorderBufferCopySnap.
3888  * This allows the catalog changes made in subtransactions decoded till
3889  * now to be visible.
3890  */
3891  if (txn->snapshot_now == NULL)
3892  {
3893  dlist_iter subxact_i;
3894 
3895  /* make sure this transaction is streamed for the first time */
3896  Assert(!rbtxn_is_streamed(txn));
3897 
3898  /* at the beginning we should have invalid command ID */
3900 
3901  dlist_foreach(subxact_i, &txn->subtxns)
3902  {
3903  ReorderBufferTXN *subtxn;
3904 
3905  subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
3906  ReorderBufferTransferSnapToParent(txn, subtxn);
3907  }
3908 
3909  /*
3910  * If this transaction has no snapshot, it didn't make any changes to
3911  * the database till now, so there's nothing to decode.
3912  */
3913  if (txn->base_snapshot == NULL)
3914  {
3915  Assert(txn->ninvalidations == 0);
3916  return;
3917  }
3918 
3919  command_id = FirstCommandId;
3920  snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
3921  txn, command_id);
3922  }
3923  else
3924  {
3925  /* the transaction must have been already streamed */
3926  Assert(rbtxn_is_streamed(txn));
3927 
3928  /*
3929  * Nah, we already have snapshot from the previous streaming run. We
3930  * assume new subxacts can't move the LSN backwards, and so can't beat
3931  * the LSN condition in the previous branch (so no need to walk
3932  * through subxacts again). In fact, we must not do that as we may be
3933  * using snapshot half-way through the subxact.
3934  */
3935  command_id = txn->command_id;
3936 
3937  /*
3938  * We can't use txn->snapshot_now directly because after the last
3939  * streaming run, we might have got some new sub-transactions. So we
3940  * need to add them to the snapshot.
3941  */
3942  snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now,
3943  txn, command_id);
3944 
3945  /* Free the previously copied snapshot. */
3946  Assert(txn->snapshot_now->copied);
3948  txn->snapshot_now = NULL;
3949  }
3950 
3951  /*
3952  * Remember this information to be used later to update stats. We can't
3953  * update the stats here as an error while processing the changes would
3954  * lead to the accumulation of stats even though we haven't streamed all
3955  * the changes.
3956  */
3957  txn_is_streamed = rbtxn_is_streamed(txn);
3958  stream_bytes = txn->total_size;
3959 
3960  /* Process and send the changes to output plugin. */
3961  ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
3962  command_id, true);
3963 
3964  rb->streamCount += 1;
3965  rb->streamBytes += stream_bytes;
3966 
3967  /* Don't consider already streamed transaction. */
3968  rb->streamTxns += (txn_is_streamed) ? 0 : 1;
3969 
3970  /* update the decoding stats */
3972 
3973  Assert(dlist_is_empty(&txn->changes));
3974  Assert(txn->nentries == 0);
3975  Assert(txn->nentries_mem == 0);
3976 }

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::changes, ReorderBufferTXN::command_id, SnapshotData::copied, dlist_iter::cur, dlist_container, dlist_foreach, dlist_is_empty(), FirstCommandId, InvalidCommandId, InvalidXLogRecPtr, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferTXN::ninvalidations, ReorderBuffer::private_data, rbtxn_is_streamed, ReorderBufferCopySnap(), ReorderBufferFreeSnap(), ReorderBufferProcessTXN(), ReorderBufferTransferSnapToParent(), ReorderBufferTXN::snapshot_now, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBufferTXN::subtxns, ReorderBufferTXN::toptxn, ReorderBufferTXN::total_size, and UpdateDecodingStats().

Referenced by ReorderBufferCheckMemoryLimit(), ReorderBufferProcessPartialChange(), and ReorderBufferStreamCommit().

◆ ReorderBufferToastAppendChunk()

static void ReorderBufferToastAppendChunk ( ReorderBuffer rb