PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
reorderbuffer.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/tuptoaster.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/combocid.h"
#include "utils/memdebug.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenodemap.h"
#include "utils/tqual.h"
Include dependency graph for reorderbuffer.c:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTXNByIdEnt
 
struct  ReorderBufferTupleCidKey
 
struct  ReorderBufferTupleCidEnt
 
struct  ReorderBufferIterTXNEntry
 
struct  ReorderBufferIterTXNState
 
struct  ReorderBufferToastEnt
 
struct  ReorderBufferDiskChange
 
struct  RewriteMappingFile
 

Typedefs

typedef struct
ReorderBufferTXNByIdEnt 
ReorderBufferTXNByIdEnt
 
typedef struct
ReorderBufferTupleCidKey 
ReorderBufferTupleCidKey
 
typedef struct
ReorderBufferTupleCidEnt 
ReorderBufferTupleCidEnt
 
typedef struct
ReorderBufferIterTXNEntry 
ReorderBufferIterTXNEntry
 
typedef struct
ReorderBufferIterTXNState 
ReorderBufferIterTXNState
 
typedef struct
ReorderBufferToastEnt 
ReorderBufferToastEnt
 
typedef struct
ReorderBufferDiskChange 
ReorderBufferDiskChange
 
typedef struct RewriteMappingFile RewriteMappingFile
 

Functions

static ReorderBufferTXNReorderBufferGetTXN (ReorderBuffer *rb)
 
static void ReorderBufferReturnTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferTXNReorderBufferTXNByXid (ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
 
static void AssertTXNLsnOrder (ReorderBuffer *rb)
 
static ReorderBufferIterTXNStateReorderBufferIterTXNInit (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferChangeReorderBufferIterTXNNext (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferIterTXNFinish (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferExecuteInvalidations (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferCheckSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeChange (ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
 
static Size ReorderBufferRestoreChanges (ReorderBuffer *rb, ReorderBufferTXN *txn, int *fd, XLogSegNo *segno)
 
static void ReorderBufferRestoreChange (ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
 
static void ReorderBufferRestoreCleanup (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferFreeSnap (ReorderBuffer *rb, Snapshot snap)
 
static Snapshot ReorderBufferCopySnap (ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
 
static void ReorderBufferToastInitHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReset (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReplace (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void ReorderBufferToastAppendChunk (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *rb)
 
void ReorderBufferReturnChange (ReorderBuffer *rb, ReorderBufferChange *change)
 
ReorderBufferTupleBufReorderBufferGetTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
static int ReorderBufferIterCompare (Datum a, Datum b, void *arg)
 
static void ReorderBufferCleanupTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferBuildTupleCidHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
static void ReorderBufferSerializeReserve (ReorderBuffer *rb, Size sz)
 
void StartupReorderBuffer (void)
 
static void ApplyLogicalMappingFile (HTAB *tuplecid_data, Oid relid, const char *fname)
 
static bool TransactionIdInArray (TransactionId xid, TransactionId *xip, Size num)
 
static int file_sort_by_lsn (const void *a_p, const void *b_p)
 
static void UpdateLogicalMappings (HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
 
bool ResolveCminCmaxDuringDecoding (HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
 

Variables

static const Size max_changes_in_memory = 4096
 
static const Size max_cached_changes = 4096 * 2
 
static const Size max_cached_tuplebufs = 4096 * 2
 
static const Size max_cached_transactions = 512
 

Typedef Documentation

Function Documentation

static void ApplyLogicalMappingFile ( HTAB tuplecid_data,
Oid  relid,
const char *  fname 
)
static

Definition at line 3067 of file reorderbuffer.c.

References Assert, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), HASH_ENTER, HASH_FIND, hash_search(), InvalidCommandId, ItemPointerCopy, MAXPGPATH, LogicalRewriteMappingData::new_node, LogicalRewriteMappingData::new_tid, NULL, LogicalRewriteMappingData::old_node, LogicalRewriteMappingData::old_tid, OpenTransientFile(), PG_BINARY, read, ReorderBufferTupleCidKey::relnode, and ReorderBufferTupleCidKey::tid.

Referenced by UpdateLogicalMappings().

3068 {
3069  char path[MAXPGPATH];
3070  int fd;
3071  int readBytes;
3073 
3074  sprintf(path, "pg_logical/mappings/%s", fname);
3075  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
3076  if (fd < 0)
3077  ereport(ERROR,
3079  errmsg("could not open file \"%s\": %m", path)));
3080 
3081  while (true)
3082  {
3085  ReorderBufferTupleCidEnt *new_ent;
3086  bool found;
3087 
3088  /* be careful about padding */
3089  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
3090 
3091  /* read all mappings till the end of the file */
3092  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
3093 
3094  if (readBytes < 0)
3095  ereport(ERROR,
3097  errmsg("could not read file \"%s\": %m",
3098  path)));
3099  else if (readBytes == 0) /* EOF */
3100  break;
3101  else if (readBytes != sizeof(LogicalRewriteMappingData))
3102  ereport(ERROR,
3104  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
3105  path, readBytes,
3106  (int32) sizeof(LogicalRewriteMappingData))));
3107 
3108  key.relnode = map.old_node;
3109  ItemPointerCopy(&map.old_tid,
3110  &key.tid);
3111 
3112 
3113  ent = (ReorderBufferTupleCidEnt *)
3114  hash_search(tuplecid_data,
3115  (void *) &key,
3116  HASH_FIND,
3117  NULL);
3118 
3119  /* no existing mapping, no need to update */
3120  if (!ent)
3121  continue;
3122 
3123  key.relnode = map.new_node;
3124  ItemPointerCopy(&map.new_tid,
3125  &key.tid);
3126 
3127  new_ent = (ReorderBufferTupleCidEnt *)
3128  hash_search(tuplecid_data,
3129  (void *) &key,
3130  HASH_ENTER,
3131  &found);
3132 
3133  if (found)
3134  {
3135  /*
3136  * Make sure the existing mapping makes sense. We sometime update
3137  * old records that did not yet have a cmax (e.g. pg_class' own
3138  * entry while rewriting it) during rewrites, so allow that.
3139  */
3140  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
3141  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
3142  }
3143  else
3144  {
3145  /* update mapping */
3146  new_ent->cmin = ent->cmin;
3147  new_ent->cmax = ent->cmax;
3148  new_ent->combocid = ent->combocid;
3149  }
3150  }
3151 }
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:885
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1038
signed int int32
Definition: c.h:253
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2093
int errcode_for_file_access(void)
Definition: elog.c:598
#define ereport(elevel, rest)
Definition: elog.h:122
#define InvalidCommandId
Definition: c.h:411
ItemPointerData new_tid
Definition: rewriteheap.h:40
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define read(a, b, c)
Definition: win32.h:18
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:120
ItemPointerData old_tid
Definition: rewriteheap.h:39
static void AssertTXNLsnOrder ( ReorderBuffer rb)
static

Definition at line 692 of file reorderbuffer.c.

References Assert, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, and ReorderBuffer::toplevel_by_lsn.

Referenced by ReorderBufferGetOldestTXN(), and ReorderBufferTXNByXid().

693 {
694 #ifdef USE_ASSERT_CHECKING
695  dlist_iter iter;
696  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
697 
698  dlist_foreach(iter, &rb->toplevel_by_lsn)
699  {
700  ReorderBufferTXN *cur_txn;
701 
702  cur_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
703  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
704 
705  if (cur_txn->end_lsn != InvalidXLogRecPtr)
706  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
707 
708  if (prev_first_lsn != InvalidXLogRecPtr)
709  Assert(prev_first_lsn < cur_txn->first_lsn);
710 
711  Assert(!cur_txn->is_known_as_subxact);
712  prev_first_lsn = cur_txn->first_lsn;
713  }
714 #endif
715 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head toplevel_by_lsn
dlist_node * cur
Definition: ilist.h:161
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:671
XLogRecPtr end_lsn
static int file_sort_by_lsn ( const void *  a_p,
const void *  b_p 
)
static

Definition at line 3168 of file reorderbuffer.c.

References RewriteMappingFile::lsn.

Referenced by UpdateLogicalMappings().

3169 {
3170  RewriteMappingFile *a = *(RewriteMappingFile **) a_p;
3171  RewriteMappingFile *b = *(RewriteMappingFile **) b_p;
3172 
3173  if (a->lsn < b->lsn)
3174  return -1;
3175  else if (a->lsn > b->lsn)
3176  return 1;
3177  return 0;
3178 }
void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1724 of file reorderbuffer.c.

References ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, NULL, ReorderBufferCleanupTXN(), and ReorderBufferTXNByXid().

Referenced by DecodeAbort().

1725 {
1726  ReorderBufferTXN *txn;
1727 
1728  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1729  false);
1730 
1731  /* unknown, nothing to remove */
1732  if (txn == NULL)
1733  return;
1734 
1735  /* cosmetic... */
1736  txn->final_lsn = lsn;
1737 
1738  /* remove potential on-disk data, and deallocate */
1739  ReorderBufferCleanupTXN(rb, txn);
1740 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define NULL
Definition: c.h:226
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)

Definition at line 1750 of file reorderbuffer.c.

References dlist_mutable_iter::cur, DEBUG1, dlist_container, dlist_foreach_modify, elog, ReorderBufferCleanupTXN(), ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), and ReorderBufferTXN::xid.

Referenced by DecodeStandbyOp().

1751 {
1752  dlist_mutable_iter it;
1753 
1754  /*
1755  * Iterate through all (potential) toplevel TXNs and abort all that are
1756  * older than what possibly can be running. Once we've found the first
1757  * that is alive we stop, there might be some that acquired an xid earlier
1758  * but started writing later, but it's unlikely and they will cleaned up
1759  * in a later call to ReorderBufferAbortOld().
1760  */
1762  {
1763  ReorderBufferTXN *txn;
1764 
1765  txn = dlist_container(ReorderBufferTXN, node, it.cur);
1766 
1767  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1768  {
1769  elog(DEBUG1, "aborting old transaction %u", txn->xid);
1770 
1771  /* remove potential on-disk data, and deallocate this tx */
1772  ReorderBufferCleanupTXN(rb, txn);
1773  }
1774  else
1775  return;
1776  }
1777 }
#define DEBUG1
Definition: elog.h:25
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
dlist_head toplevel_by_lsn
TransactionId xid
#define elog
Definition: elog.h:219
void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 1962 of file reorderbuffer.c.

References Assert, ReorderBuffer::context, elog, ERROR, ReorderBufferTXN::invalidations, MemoryContextAlloc(), ReorderBufferTXN::ninvalidations, NULL, and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

1965 {
1966  ReorderBufferTXN *txn;
1967 
1968  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1969 
1970  if (txn->ninvalidations != 0)
1971  elog(ERROR, "only ever add one set of invalidations");
1972 
1973  Assert(nmsgs > 0);
1974 
1975  txn->ninvalidations = nmsgs;
1978  sizeof(SharedInvalidationMessage) * nmsgs);
1979  memcpy(txn->invalidations, msgs,
1980  sizeof(SharedInvalidationMessage) * nmsgs);
1981 }
#define ERROR
Definition: elog.h:43
MemoryContext context
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:749
#define elog
Definition: elog.h:219
void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

Definition at line 1918 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::command_id, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, ReorderBufferGetChange(), and ReorderBufferQueueChange().

Referenced by SnapBuildProcessNewCid().

1920 {
1922 
1923  change->data.command_id = cid;
1925 
1926  ReorderBufferQueueChange(rb, xid, lsn, change);
1927 }
union ReorderBufferChange::@49 data
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileNode  node,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 1934 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, NULL, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, and ReorderBufferTXN::tuplecids.

Referenced by SnapBuildProcessNewCid().

1938 {
1940  ReorderBufferTXN *txn;
1941 
1942  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1943 
1944  change->data.tuplecid.node = node;
1945  change->data.tuplecid.tid = tid;
1946  change->data.tuplecid.cmin = cmin;
1947  change->data.tuplecid.cmax = cmax;
1948  change->data.tuplecid.combocid = combocid;
1949  change->lsn = lsn;
1951 
1952  dlist_push_tail(&txn->tuplecids, &change->node);
1953  txn->ntuplecids++;
1954 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
union ReorderBufferChange::@49 data
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
struct ReorderBufferChange::@49::@52 tuplecid
#define NULL
Definition: c.h:226
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
dlist_head tuplecids
void ReorderBufferAddSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 1878 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, ReorderBufferGetChange(), ReorderBufferQueueChange(), and ReorderBufferChange::snapshot.

Referenced by SnapBuildDistributeNewCatalogSnapshot().

1880 {
1882 
1883  change->data.snapshot = snap;
1885 
1886  ReorderBufferQueueChange(rb, xid, lsn, change);
1887 }
union ReorderBufferChange::@49 data
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 226 of file reorderbuffer.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate(), ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::cached_changes, ReorderBuffer::cached_transactions, ReorderBuffer::cached_tuplebufs, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, dlist_init(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), ReorderBuffer::nr_cached_changes, ReorderBuffer::nr_cached_transactions, ReorderBuffer::nr_cached_tuplebufs, NULL, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, slist_init(), and ReorderBuffer::toplevel_by_lsn.

Referenced by StartupDecodingContext().

227 {
228  ReorderBuffer *buffer;
229  HASHCTL hash_ctl;
230  MemoryContext new_ctx;
231 
232  /* allocate memory in own context, to have better accountability */
234  "ReorderBuffer",
236 
237  buffer =
238  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
239 
240  memset(&hash_ctl, 0, sizeof(hash_ctl));
241 
242  buffer->context = new_ctx;
243 
244  hash_ctl.keysize = sizeof(TransactionId);
245  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
246  hash_ctl.hcxt = buffer->context;
247 
248  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
250 
252  buffer->by_txn_last_txn = NULL;
253 
254  buffer->nr_cached_transactions = 0;
255  buffer->nr_cached_changes = 0;
256  buffer->nr_cached_tuplebufs = 0;
257 
258  buffer->outbuf = NULL;
259  buffer->outbufsize = 0;
260 
262 
263  dlist_init(&buffer->toplevel_by_lsn);
265  dlist_init(&buffer->cached_changes);
266  slist_init(&buffer->cached_tuplebufs);
267 
268  return buffer;
269 }
dlist_head cached_changes
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
uint32 TransactionId
Definition: c.h:394
MemoryContext hcxt
Definition: hsearch.h:78
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
Size entrysize
Definition: hsearch.h:73
static void slist_init(slist_head *head)
Definition: ilist.h:554
Size nr_cached_transactions
Size nr_cached_changes
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:145
dlist_head cached_transactions
#define InvalidTransactionId
Definition: transam.h:31
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext context
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:440
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:301
ReorderBufferTXN * by_txn_last_txn
dlist_head toplevel_by_lsn
Size keysize
Definition: hsearch.h:72
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
#define NULL
Definition: c.h:226
Size nr_cached_tuplebufs
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:749
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
slist_head cached_tuplebufs
void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 741 of file reorderbuffer.c.

References Assert, dlist_delete(), dlist_push_tail(), elog, ERROR, ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, ReorderBufferTXNByXid(), and ReorderBufferTXN::subtxns.

Referenced by DecodeXactOp().

743 {
744  ReorderBufferTXN *txn;
745  ReorderBufferTXN *subtxn;
746  bool new_top;
747  bool new_sub;
748 
749  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
750  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
751 
752  if (new_sub)
753  {
754  /*
755  * we assign subtransactions to top level transaction even if we don't
756  * have data for it yet, assignment records frequently reference xids
757  * that have not yet produced any records. Knowing those aren't top
758  * level xids allows us to make processing cheaper in some places.
759  */
760  dlist_push_tail(&txn->subtxns, &subtxn->node);
761  txn->nsubtxns++;
762  }
763  else if (!subtxn->is_known_as_subxact)
764  {
765  subtxn->is_known_as_subxact = true;
766  Assert(subtxn->nsubtxns == 0);
767 
768  /* remove from lsn order list of top-level transactions */
769  dlist_delete(&subtxn->node);
770 
771  /* add to toplevel transaction */
772  dlist_push_tail(&txn->subtxns, &subtxn->node);
773  txn->nsubtxns++;
774  }
775  else if (new_top)
776  {
777  elog(ERROR, "existing subxact assigned to unknown toplevel xact");
778  }
779 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
#define ERROR
Definition: elog.h:43
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define Assert(condition)
Definition: c.h:671
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define elog
Definition: elog.h:219
static void ReorderBufferBuildTupleCidHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1198 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, dlist_container, dlist_foreach, dlist_is_empty(), HASHCTL::entrysize, ReorderBufferTXN::has_catalog_changes, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FIND, hash_search(), HASHCTL::hcxt, InvalidCommandId, ItemPointerCopy, HASHCTL::keysize, ReorderBufferTXN::ntuplecids, ReorderBufferTupleCidKey::relnode, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferTupleCidKey::tid, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::tuplecids.

Referenced by ReorderBufferCommit().

1199 {
1200  dlist_iter iter;
1201  HASHCTL hash_ctl;
1202 
1203  if (!txn->has_catalog_changes || dlist_is_empty(&txn->tuplecids))
1204  return;
1205 
1206  memset(&hash_ctl, 0, sizeof(hash_ctl));
1207 
1208  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1209  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1210  hash_ctl.hcxt = rb->context;
1211 
1212  /*
1213  * create the hash with the exact number of to-be-stored tuplecids from
1214  * the start
1215  */
1216  txn->tuplecid_hash =
1217  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1219 
1220  dlist_foreach(iter, &txn->tuplecids)
1221  {
1224  bool found;
1225  ReorderBufferChange *change;
1226 
1227  change = dlist_container(ReorderBufferChange, node, iter.cur);
1228 
1230 
1231  /* be careful about padding */
1232  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1233 
1234  key.relnode = change->data.tuplecid.node;
1235 
1236  ItemPointerCopy(&change->data.tuplecid.tid,
1237  &key.tid);
1238 
1239  ent = (ReorderBufferTupleCidEnt *)
1241  (void *) &key,
1243  &found);
1244  if (!found)
1245  {
1246  ent->cmin = change->data.tuplecid.cmin;
1247  ent->cmax = change->data.tuplecid.cmax;
1248  ent->combocid = change->data.tuplecid.combocid;
1249  }
1250  else
1251  {
1252  Assert(ent->cmin == change->data.tuplecid.cmin);
1253  Assert(ent->cmax == InvalidCommandId ||
1254  ent->cmax == change->data.tuplecid.cmax);
1255 
1256  /*
1257  * if the tuple got valid in this transaction and now got deleted
1258  * we already have a valid cmin stored. The cmax will be
1259  * InvalidCommandId though.
1260  */
1261  ent->cmax = change->data.tuplecid.cmax;
1262  }
1263  }
1264 }
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
Size entrysize
Definition: hsearch.h:73
union ReorderBufferChange::@49 data
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:885
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
struct ReorderBufferChange::@49::@52 tuplecid
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext context
#define InvalidCommandId
Definition: c.h:411
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:301
Size keysize
Definition: hsearch.h:72
dlist_node * cur
Definition: ilist.h:161
#define Assert(condition)
Definition: c.h:671
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
dlist_head tuplecids
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:120
static void ReorderBufferCheckSerializeTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2079 of file reorderbuffer.c.

References Assert, max_changes_in_memory, ReorderBufferTXN::nentries_mem, and ReorderBufferSerializeTXN().

Referenced by ReorderBufferQueueChange().

2080 {
2081  /*
2082  * TODO: improve accounting so we cheaply can take subtransactions into
2083  * account here.
2084  */
2085  if (txn->nentries_mem >= max_changes_in_memory)
2086  {
2087  ReorderBufferSerializeTXN(rb, txn);
2088  Assert(txn->nentries_mem == 0);
2089  }
2090 }
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:671
static const Size max_changes_in_memory
static void ReorderBufferCleanupTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1115 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBuffer::by_txn, ReorderBufferTXN::changes, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, HASH_REMOVE, hash_search(), InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, NULL, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferRestoreCleanup(), ReorderBufferReturnChange(), ReorderBufferReturnTXN(), SnapBuildSnapDecRefcount(), ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAbortOld(), ReorderBufferCommit(), and ReorderBufferForget().

1116 {
1117  bool found;
1118  dlist_mutable_iter iter;
1119 
1120  /* cleanup subtransactions & their changes */
1121  dlist_foreach_modify(iter, &txn->subtxns)
1122  {
1123  ReorderBufferTXN *subtxn;
1124 
1125  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1126 
1127  /*
1128  * Subtransactions are always associated to the toplevel TXN, even if
1129  * they originally were happening inside another subtxn, so we won't
1130  * ever recurse more than one level deep here.
1131  */
1132  Assert(subtxn->is_known_as_subxact);
1133  Assert(subtxn->nsubtxns == 0);
1134 
1135  ReorderBufferCleanupTXN(rb, subtxn);
1136  }
1137 
1138  /* cleanup changes in the toplevel txn */
1139  dlist_foreach_modify(iter, &txn->changes)
1140  {
1141  ReorderBufferChange *change;
1142 
1143  change = dlist_container(ReorderBufferChange, node, iter.cur);
1144 
1145  ReorderBufferReturnChange(rb, change);
1146  }
1147 
1148  /*
1149  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1150  * They are always stored in the toplevel transaction.
1151  */
1152  dlist_foreach_modify(iter, &txn->tuplecids)
1153  {
1154  ReorderBufferChange *change;
1155 
1156  change = dlist_container(ReorderBufferChange, node, iter.cur);
1158  ReorderBufferReturnChange(rb, change);
1159  }
1160 
1161  if (txn->base_snapshot != NULL)
1162  {
1164  txn->base_snapshot = NULL;
1166  }
1167 
1168  /*
1169  * Remove TXN from its containing list.
1170  *
1171  * Note: if txn->is_known_as_subxact, we are deleting the TXN from its
1172  * parent's list of known subxacts; this leaves the parent's nsubxacts
1173  * count too high, but we don't care. Otherwise, we are deleting the TXN
1174  * from the LSN-ordered list of toplevel TXNs.
1175  */
1176  dlist_delete(&txn->node);
1177 
1178  /* now remove reference from buffer */
1179  hash_search(rb->by_txn,
1180  (void *) &txn->xid,
1181  HASH_REMOVE,
1182  &found);
1183  Assert(found);
1184 
1185  /* remove entries spilled to disk */
1186  if (txn->nentries != txn->nentries_mem)
1187  ReorderBufferRestoreCleanup(rb, txn);
1188 
1189  /* deallocate */
1190  ReorderBufferReturnTXN(rb, txn);
1191 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:885
XLogRecPtr base_snapshot_lsn
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:397
dlist_head subtxns
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
dlist_head tuplecids
void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 1353 of file reorderbuffer.c.

References AbortCurrentTransaction(), ReorderBufferChange::action, ReorderBuffer::apply_change, Assert, ReorderBufferTXN::base_snapshot, ReorderBuffer::begin, BeginInternalSubTransaction(), ReorderBufferChange::command_id, ReorderBuffer::commit, ReorderBufferTXN::commit_time, SnapshotData::copied, SnapshotData::curcid, ReorderBufferChange::data, dlist_delete(), elog, ReorderBufferTXN::end_lsn, ERROR, ReorderBufferTXN::final_lsn, FirstCommandId, GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, ReorderBuffer::message, ReorderBufferChange::msg, ReorderBufferTXN::ninvalidations, ReorderBufferChange::node, NULL, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelidByRelfilenode(), RELKIND_SEQUENCE, relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferReturnChange(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTXNByXid(), RollbackAndReleaseCurrentSubTransaction(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, StartTransactionCommand(), TeardownHistoricSnapshot(), ReorderBufferChange::tp, and ReorderBufferTXN::tuplecid_hash.

Referenced by DecodeCommit().

1357 {
1358  ReorderBufferTXN *txn;
1359  volatile Snapshot snapshot_now;
1360  volatile CommandId command_id = FirstCommandId;
1361  bool using_subtxn;
1362  ReorderBufferIterTXNState *volatile iterstate = NULL;
1363 
1364  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1365  false);
1366 
1367  /* unknown transaction, nothing to replay */
1368  if (txn == NULL)
1369  return;
1370 
1371  txn->final_lsn = commit_lsn;
1372  txn->end_lsn = end_lsn;
1373  txn->commit_time = commit_time;
1374  txn->origin_id = origin_id;
1375  txn->origin_lsn = origin_lsn;
1376 
1377  /*
1378  * If this transaction didn't have any real changes in our database, it's
1379  * OK not to have a snapshot. Note that ReorderBufferCommitChild will have
1380  * transferred its snapshot to this transaction if it had one and the
1381  * toplevel tx didn't.
1382  */
1383  if (txn->base_snapshot == NULL)
1384  {
1385  Assert(txn->ninvalidations == 0);
1386  ReorderBufferCleanupTXN(rb, txn);
1387  return;
1388  }
1389 
1390  snapshot_now = txn->base_snapshot;
1391 
1392  /* build data to be able to lookup the CommandIds of catalog tuples */
1394 
1395  /* setup the initial snapshot */
1396  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1397 
1398  /*
1399  * Decoding needs access to syscaches et al., which in turn use
1400  * heavyweight locks and such. Thus we need to have enough state around to
1401  * keep track of those. The easiest way is to simply use a transaction
1402  * internally. That also allows us to easily enforce that nothing writes
1403  * to the database by checking for xid assignments.
1404  *
1405  * When we're called via the SQL SRF there's already a transaction
1406  * started, so start an explicit subtransaction there.
1407  */
1408  using_subtxn = IsTransactionOrTransactionBlock();
1409 
1410  PG_TRY();
1411  {
1412  ReorderBufferChange *change;
1413  ReorderBufferChange *specinsert = NULL;
1414 
1415  if (using_subtxn)
1416  BeginInternalSubTransaction("replay");
1417  else
1419 
1420  rb->begin(rb, txn);
1421 
1422  iterstate = ReorderBufferIterTXNInit(rb, txn);
1423  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1424  {
1425  Relation relation = NULL;
1426  Oid reloid;
1427 
1428  switch (change->action)
1429  {
1431 
1432  /*
1433  * Confirmation for speculative insertion arrived. Simply
1434  * use as a normal record. It'll be cleaned up at the end
1435  * of INSERT processing.
1436  */
1437  Assert(specinsert->data.tp.oldtuple == NULL);
1438  change = specinsert;
1440 
1441  /* intentionally fall through */
1445  Assert(snapshot_now);
1446 
1447  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1448  change->data.tp.relnode.relNode);
1449 
1450  /*
1451  * Catalog tuple without data, emitted while catalog was
1452  * in the process of being rewritten.
1453  */
1454  if (reloid == InvalidOid &&
1455  change->data.tp.newtuple == NULL &&
1456  change->data.tp.oldtuple == NULL)
1457  goto change_done;
1458  else if (reloid == InvalidOid)
1459  elog(ERROR, "could not map filenode \"%s\" to relation OID",
1460  relpathperm(change->data.tp.relnode,
1461  MAIN_FORKNUM));
1462 
1463  relation = RelationIdGetRelation(reloid);
1464 
1465  if (relation == NULL)
1466  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1467  reloid,
1468  relpathperm(change->data.tp.relnode,
1469  MAIN_FORKNUM));
1470 
1471  if (!RelationIsLogicallyLogged(relation))
1472  goto change_done;
1473 
1474  /*
1475  * For now ignore sequence changes entirely. Most of the
1476  * time they don't log changes using records we
1477  * understand, so it doesn't make sense to handle the few
1478  * cases we do.
1479  */
1480  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1481  goto change_done;
1482 
1483  /* user-triggered change */
1484  if (!IsToastRelation(relation))
1485  {
1486  ReorderBufferToastReplace(rb, txn, relation, change);
1487  rb->apply_change(rb, txn, relation, change);
1488 
1489  /*
1490  * Only clear reassembled toast chunks if we're sure
1491  * they're not required anymore. The creator of the
1492  * tuple tells us.
1493  */
1494  if (change->data.tp.clear_toast_afterwards)
1495  ReorderBufferToastReset(rb, txn);
1496  }
1497  /* we're not interested in toast deletions */
1498  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1499  {
1500  /*
1501  * Need to reassemble the full toasted Datum in
1502  * memory, to ensure the chunks don't get reused till
1503  * we're done remove it from the list of this
1504  * transaction's changes. Otherwise it will get
1505  * freed/reused while restoring spooled data from
1506  * disk.
1507  */
1508  dlist_delete(&change->node);
1509  ReorderBufferToastAppendChunk(rb, txn, relation,
1510  change);
1511  }
1512 
1513  change_done:
1514 
1515  /*
1516  * Either speculative insertion was confirmed, or it was
1517  * unsuccessful and the record isn't needed anymore.
1518  */
1519  if (specinsert != NULL)
1520  {
1521  ReorderBufferReturnChange(rb, specinsert);
1522  specinsert = NULL;
1523  }
1524 
1525  if (relation != NULL)
1526  {
1527  RelationClose(relation);
1528  relation = NULL;
1529  }
1530  break;
1531 
1533 
1534  /*
1535  * Speculative insertions are dealt with by delaying the
1536  * processing of the insert until the confirmation record
1537  * arrives. For that we simply unlink the record from the
1538  * chain, so it does not get freed/reused while restoring
1539  * spooled data from disk.
1540  *
1541  * This is safe in the face of concurrent catalog changes
1542  * because the relevant relation can't be changed between
1543  * speculative insertion and confirmation due to
1544  * CheckTableNotInUse() and locking.
1545  */
1546 
1547  /* clear out a pending (and thus failed) speculation */
1548  if (specinsert != NULL)
1549  {
1550  ReorderBufferReturnChange(rb, specinsert);
1551  specinsert = NULL;
1552  }
1553 
1554  /* and memorize the pending insertion */
1555  dlist_delete(&change->node);
1556  specinsert = change;
1557  break;
1558 
1560  rb->message(rb, txn, change->lsn, true,
1561  change->data.msg.prefix,
1562  change->data.msg.message_size,
1563  change->data.msg.message);
1564  break;
1565 
1567  /* get rid of the old */
1568  TeardownHistoricSnapshot(false);
1569 
1570  if (snapshot_now->copied)
1571  {
1572  ReorderBufferFreeSnap(rb, snapshot_now);
1573  snapshot_now =
1574  ReorderBufferCopySnap(rb, change->data.snapshot,
1575  txn, command_id);
1576  }
1577 
1578  /*
1579  * Restored from disk, need to be careful not to double
1580  * free. We could introduce refcounting for that, but for
1581  * now this seems infrequent enough not to care.
1582  */
1583  else if (change->data.snapshot->copied)
1584  {
1585  snapshot_now =
1586  ReorderBufferCopySnap(rb, change->data.snapshot,
1587  txn, command_id);
1588  }
1589  else
1590  {
1591  snapshot_now = change->data.snapshot;
1592  }
1593 
1594 
1595  /* and continue with the new one */
1596  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1597  break;
1598 
1600  Assert(change->data.command_id != InvalidCommandId);
1601 
1602  if (command_id < change->data.command_id)
1603  {
1604  command_id = change->data.command_id;
1605 
1606  if (!snapshot_now->copied)
1607  {
1608  /* we don't use the global one anymore */
1609  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1610  txn, command_id);
1611  }
1612 
1613  snapshot_now->curcid = command_id;
1614 
1615  TeardownHistoricSnapshot(false);
1616  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1617 
1618  /*
1619  * Every time the CommandId is incremented, we could
1620  * see new catalog contents, so execute all
1621  * invalidations.
1622  */
1624  }
1625 
1626  break;
1627 
1629  elog(ERROR, "tuplecid value in changequeue");
1630  break;
1631  }
1632  }
1633 
1634  /*
1635  * There's a speculative insertion remaining, just clean in up, it
1636  * can't have been successful, otherwise we'd gotten a confirmation
1637  * record.
1638  */
1639  if (specinsert)
1640  {
1641  ReorderBufferReturnChange(rb, specinsert);
1642  specinsert = NULL;
1643  }
1644 
1645  /* clean up the iterator */
1646  ReorderBufferIterTXNFinish(rb, iterstate);
1647  iterstate = NULL;
1648 
1649  /* call commit callback */
1650  rb->commit(rb, txn, commit_lsn);
1651 
1652  /* this is just a sanity check against bad output plugin behaviour */
1654  elog(ERROR, "output plugin used XID %u",
1656 
1657  /* cleanup */
1658  TeardownHistoricSnapshot(false);
1659 
1660  /*
1661  * Aborting the current (sub-)transaction as a whole has the right
1662  * semantics. We want all locks acquired in here to be released, not
1663  * reassigned to the parent and we do not want any database access
1664  * have persistent effects.
1665  */
1667 
1668  /* make sure there's no cache pollution */
1670 
1671  if (using_subtxn)
1673 
1674  if (snapshot_now->copied)
1675  ReorderBufferFreeSnap(rb, snapshot_now);
1676 
1677  /* remove potential on-disk data, and deallocate */
1678  ReorderBufferCleanupTXN(rb, txn);
1679  }
1680  PG_CATCH();
1681  {
1682  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1683  if (iterstate)
1684  ReorderBufferIterTXNFinish(rb, iterstate);
1685 
1687 
1688  /*
1689  * Force cache invalidation to happen outside of a valid transaction
1690  * to prevent catalog access as we just caught an error.
1691  */
1693 
1694  /* make sure there's no cache pollution */
1696 
1697  if (using_subtxn)
1699 
1700  if (snapshot_now->copied)
1701  ReorderBufferFreeSnap(rb, snapshot_now);
1702 
1703  /* remove potential on-disk data, and deallocate */
1704  ReorderBufferCleanupTXN(rb, txn);
1705 
1706  PG_RE_THROW();
1707  }
1708  PG_END_TRY();
1709 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
uint32 CommandId
Definition: c.h:408
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
TimestampTz commit_time
void AbortCurrentTransaction(void)
Definition: xact.c:2984
bool IsToastRelation(Relation relation)
Definition: catalog.c:135
#define relpathperm(rnode, forknum)
Definition: relpath.h:67
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
ReorderBufferApplyChangeCB apply_change
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
bool copied
Definition: snapshot.h:93
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
union ReorderBufferChange::@49 data
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4320
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:1962
ReorderBufferCommitCB commit
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:572
Form_pg_class rd_rel
Definition: rel.h:113
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
XLogRecPtr origin_lsn
#define FirstCommandId
Definition: c.h:410
struct ReorderBufferChange::@49::@51 msg
struct ReorderBufferChange::@49::@50 tp
#define ERROR
Definition: elog.h:43
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:416
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4155
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:433
#define InvalidTransactionId
Definition: transam.h:31
XLogRecPtr final_lsn
void RelationClose(Relation relation)
Definition: relcache.c:2155
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
ReorderBufferMessageCB message
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
#define InvalidCommandId
Definition: c.h:411
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
#define InvalidOid
Definition: postgres_ext.h:36
CommandId curcid
Definition: snapshot.h:95
static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define PG_CATCH()
Definition: elog.h:293
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
XLogRecPtr end_lsn
void StartTransactionCommand(void)
Definition: xact.c:2675
void BeginInternalSubTransaction(char *name)
Definition: xact.c:4051
#define PG_RE_THROW()
Definition: elog.h:314
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:1946
#define elog
Definition: elog.h:219
ReorderBufferBeginCB begin
#define PG_TRY()
Definition: elog.h:284
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
#define RELKIND_SEQUENCE
Definition: pg_class.h:162
static ReorderBufferIterTXNState * ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2066
#define PG_END_TRY()
Definition: elog.h:300
void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 786 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_delete(), dlist_push_tail(), elog, ReorderBufferTXN::end_lsn, ERROR, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, NULL, ReorderBufferTXNByXid(), and ReorderBufferTXN::subtxns.

Referenced by DecodeCommit().

789 {
790  ReorderBufferTXN *txn;
791  ReorderBufferTXN *subtxn;
792 
793  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
794  InvalidXLogRecPtr, false);
795 
796  /*
797  * No need to do anything if that subtxn didn't contain any changes
798  */
799  if (!subtxn)
800  return;
801 
802  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true);
803 
804  if (txn == NULL)
805  elog(ERROR, "subxact logged without previous toplevel record");
806 
807  /*
808  * Pass our base snapshot to the parent transaction if it doesn't have
809  * one, or ours is older. That can happen if there are no changes in the
810  * toplevel transaction but in one of the child transactions. This allows
811  * the parent to simply use its base snapshot initially.
812  */
813  if (subtxn->base_snapshot != NULL &&
814  (txn->base_snapshot == NULL ||
815  txn->base_snapshot_lsn > subtxn->base_snapshot_lsn))
816  {
817  txn->base_snapshot = subtxn->base_snapshot;
818  txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
819  subtxn->base_snapshot = NULL;
821  }
822 
823  subtxn->final_lsn = commit_lsn;
824  subtxn->end_lsn = end_lsn;
825 
826  if (!subtxn->is_known_as_subxact)
827  {
828  subtxn->is_known_as_subxact = true;
829  Assert(subtxn->nsubtxns == 0);
830 
831  /* remove from lsn order list of top-level transactions */
832  dlist_delete(&subtxn->node);
833 
834  /* add to subtransaction list */
835  dlist_push_tail(&txn->subtxns, &subtxn->node);
836  txn->nsubtxns++;
837  }
838 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
XLogRecPtr base_snapshot_lsn
#define ERROR
Definition: elog.h:43
XLogRecPtr final_lsn
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
XLogRecPtr end_lsn
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define elog
Definition: elog.h:219
static Snapshot ReorderBufferCopySnap ( ReorderBuffer rb,
Snapshot  orig_snap,
ReorderBufferTXN txn,
CommandId  cid 
)
static

Definition at line 1272 of file reorderbuffer.c.

References SnapshotData::active_count, ReorderBuffer::context, SnapshotData::copied, dlist_iter::cur, SnapshotData::curcid, dlist_container, dlist_foreach, i, MemoryContextAllocZero(), ReorderBufferTXN::nsubtxns, qsort, SnapshotData::regd_count, ReorderBufferTXN::subtxns, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::xcnt, ReorderBufferTXN::xid, xidComparator(), and SnapshotData::xip.

Referenced by ReorderBufferCommit().

1274 {
1275  Snapshot snap;
1276  dlist_iter iter;
1277  int i = 0;
1278  Size size;
1279 
1280  size = sizeof(SnapshotData) +
1281  sizeof(TransactionId) * orig_snap->xcnt +
1282  sizeof(TransactionId) * (txn->nsubtxns + 1);
1283 
1284  snap = MemoryContextAllocZero(rb->context, size);
1285  memcpy(snap, orig_snap, sizeof(SnapshotData));
1286 
1287  snap->copied = true;
1288  snap->active_count = 1; /* mark as active so nobody frees it */
1289  snap->regd_count = 0;
1290  snap->xip = (TransactionId *) (snap + 1);
1291 
1292  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1293 
1294  /*
1295  * snap->subxip contains all txids that belong to our transaction which we
1296  * need to check via cmin/cmax. Thats why we store the toplevel
1297  * transaction in there as well.
1298  */
1299  snap->subxip = snap->xip + snap->xcnt;
1300  snap->subxip[i++] = txn->xid;
1301 
1302  /*
1303  * nsubxcnt isn't decreased when subtransactions abort, so count manually.
1304  * Since it's an upper boundary it is safe to use it for the allocation
1305  * above.
1306  */
1307  snap->subxcnt = 1;
1308 
1309  dlist_foreach(iter, &txn->subtxns)
1310  {
1311  ReorderBufferTXN *sub_txn;
1312 
1313  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1314  snap->subxip[i++] = sub_txn->xid;
1315  snap->subxcnt++;
1316  }
1317 
1318  /* sort so we can bsearch() later */
1319  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1320 
1321  /* store the specified current CommandId */
1322  snap->curcid = cid;
1323 
1324  return snap;
1325 }
uint32 TransactionId
Definition: c.h:394
bool copied
Definition: snapshot.h:93
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
uint32 regd_count
Definition: snapshot.h:107
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
struct SnapshotData SnapshotData
TransactionId * xip
Definition: snapshot.h:76
MemoryContext context
TransactionId xid
dlist_node * cur
Definition: ilist.h:161
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:784
CommandId curcid
Definition: snapshot.h:95
size_t Size
Definition: c.h:353
dlist_head subtxns
uint32 xcnt
Definition: snapshot.h:77
int i
#define qsort(a, b, c, d)
Definition: port.h:440
TransactionId * subxip
Definition: snapshot.h:88
uint32 active_count
Definition: snapshot.h:106
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:138
int32 subxcnt
Definition: snapshot.h:89
static void ReorderBufferExecuteInvalidations ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1988 of file reorderbuffer.c.

References i, ReorderBufferTXN::invalidations, LocalExecuteInvalidationMessage(), and ReorderBufferTXN::ninvalidations.

Referenced by ReorderBufferCommit().

1989 {
1990  int i;
1991 
1992  for (i = 0; i < txn->ninvalidations; i++)
1994 }
SharedInvalidationMessage * invalidations
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:545
int i
void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1793 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, NULL, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

1794 {
1795  ReorderBufferTXN *txn;
1796 
1797  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1798  false);
1799 
1800  /* unknown, nothing to forget */
1801  if (txn == NULL)
1802  return;
1803 
1804  /* cosmetic... */
1805  txn->final_lsn = lsn;
1806 
1807  /*
1808  * Process cache invalidation messages if there are any. Even if we're not
1809  * interested in the transaction's contents, it could have manipulated the
1810  * catalog and we need to update the caches according to that.
1811  */
1812  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
1814  txn->invalidations);
1815  else
1816  Assert(txn->ninvalidations == 0);
1817 
1818  /* remove potential on-disk data, and deallocate */
1819  ReorderBufferCleanupTXN(rb, txn);
1820 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferFree ( ReorderBuffer rb)

Definition at line 275 of file reorderbuffer.c.

References ReorderBuffer::context, and MemoryContextDelete().

Referenced by FreeDecodingContext().

276 {
277  MemoryContext context = rb->context;
278 
279  /*
280  * We free separately allocated data by entirely scrapping reorderbuffer's
281  * memory context.
282  */
283  MemoryContextDelete(context);
284 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
MemoryContext context
static void ReorderBufferFreeSnap ( ReorderBuffer rb,
Snapshot  snap 
)
static

Definition at line 1331 of file reorderbuffer.c.

References SnapshotData::copied, pfree(), and SnapBuildSnapDecRefcount().

Referenced by ReorderBufferCommit(), and ReorderBufferReturnChange().

1332 {
1333  if (snap->copied)
1334  pfree(snap);
1335  else
1337 }
bool copied
Definition: snapshot.h:93
void pfree(void *pointer)
Definition: mcxt.c:992
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:397
ReorderBufferChange* ReorderBufferGetChange ( ReorderBuffer rb)

Definition at line 365 of file reorderbuffer.c.

References ReorderBuffer::cached_changes, ReorderBuffer::context, dlist_container, dlist_pop_head_node(), MemoryContextAlloc(), and ReorderBuffer::nr_cached_changes.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddNewTupleCids(), ReorderBufferAddSnapshot(), ReorderBufferQueueMessage(), and ReorderBufferRestoreChange().

366 {
367  ReorderBufferChange *change;
368 
369  /* check the slab cache */
370  if (rb->nr_cached_changes)
371  {
372  rb->nr_cached_changes--;
373  change = (ReorderBufferChange *)
376  }
377  else
378  {
379  change = (ReorderBufferChange *)
381  }
382 
383  memset(change, 0, sizeof(ReorderBufferChange));
384  return change;
385 }
dlist_head cached_changes
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
Size nr_cached_changes
MemoryContext context
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:749
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer rb)

Definition at line 718 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, NULL, and ReorderBuffer::toplevel_by_lsn.

Referenced by SnapBuildProcessRunningXacts().

719 {
720  ReorderBufferTXN *txn;
721 
723  return NULL;
724 
725  AssertTXNLsnOrder(rb);
726 
728 
731  return txn;
732 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
dlist_head toplevel_by_lsn
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
ReorderBufferTupleBuf* ReorderBufferGetTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)

Definition at line 457 of file reorderbuffer.c.

References ReorderBufferTupleBuf::alloc_tuple_size, Assert, ReorderBuffer::cached_tuplebufs, ReorderBuffer::context, MaxHeapTupleSize, MemoryContextAlloc(), ReorderBuffer::nr_cached_tuplebufs, ReorderBufferTupleBufData, SizeofHeapTupleHeader, slist_container, slist_pop_head_node(), HeapTupleData::t_data, ReorderBufferTupleBuf::tuple, and VALGRIND_MAKE_MEM_UNDEFINED.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

458 {
459  ReorderBufferTupleBuf *tuple;
460  Size alloc_len;
461 
462  alloc_len = tuple_len + SizeofHeapTupleHeader;
463 
464  /*
465  * Most tuples are below MaxHeapTupleSize, so we use a slab allocator for
466  * those. Thus always allocate at least MaxHeapTupleSize. Note that tuples
467  * generated for oldtuples can be bigger, as they don't have out-of-line
468  * toast columns.
469  */
470  if (alloc_len < MaxHeapTupleSize)
471  alloc_len = MaxHeapTupleSize;
472 
473 
474  /* if small enough, check the slab cache */
475  if (alloc_len <= MaxHeapTupleSize && rb->nr_cached_tuplebufs)
476  {
477  rb->nr_cached_tuplebufs--;
481 #ifdef USE_ASSERT_CHECKING
482  memset(&tuple->tuple, 0xa9, sizeof(HeapTupleData));
484 #endif
485  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
486 #ifdef USE_ASSERT_CHECKING
487  memset(tuple->tuple.t_data, 0xa8, tuple->alloc_tuple_size);
489 #endif
490  }
491  else
492  {
493  tuple = (ReorderBufferTupleBuf *)
495  sizeof(ReorderBufferTupleBuf) +
496  MAXIMUM_ALIGNOF + alloc_len);
497  tuple->alloc_tuple_size = alloc_len;
498  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
499  }
500 
501  return tuple;
502 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:170
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
Definition: memdebug.h:28
HeapTupleHeader t_data
Definition: htup.h:67
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:36
#define MaxHeapTupleSize
Definition: htup_details.h:561
HeapTupleData tuple
Definition: reorderbuffer.h:27
static slist_node * slist_pop_head_node(slist_head *head)
Definition: ilist.h:596
MemoryContext context
#define slist_container(type, membername, ptr)
Definition: ilist.h:674
#define Assert(condition)
Definition: c.h:671
size_t Size
Definition: c.h:353
Size nr_cached_tuplebufs
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:749
slist_head cached_tuplebufs
static ReorderBufferTXN * ReorderBufferGetTXN ( ReorderBuffer rb)
static

Definition at line 290 of file reorderbuffer.c.

References ReorderBuffer::cached_transactions, ReorderBufferTXN::changes, ReorderBuffer::context, dlist_container, dlist_init(), dlist_pop_head_node(), MemoryContextAlloc(), ReorderBuffer::nr_cached_transactions, ReorderBufferTXN::subtxns, and ReorderBufferTXN::tuplecids.

Referenced by ReorderBufferTXNByXid().

291 {
292  ReorderBufferTXN *txn;
293 
294  /* check the slab cache */
295  if (rb->nr_cached_transactions > 0)
296  {
298  txn = (ReorderBufferTXN *)
301  }
302  else
303  {
304  txn = (ReorderBufferTXN *)
306  }
307 
308  memset(txn, 0, sizeof(ReorderBufferTXN));
309 
310  dlist_init(&txn->changes);
311  dlist_init(&txn->tuplecids);
312  dlist_init(&txn->subtxns);
313 
314  return txn;
315 }
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
Size nr_cached_transactions
dlist_head changes
dlist_head cached_transactions
MemoryContext context
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
dlist_head subtxns
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:749
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
dlist_head tuplecids
void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 1829 of file reorderbuffer.c.

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by DecodeStandbyOp(), and ReorderBufferForget().

1831 {
1832  bool use_subtxn = IsTransactionOrTransactionBlock();
1833  int i;
1834 
1835  if (use_subtxn)
1836  BeginInternalSubTransaction("replay");
1837 
1838  /*
1839  * Force invalidations to happen outside of a valid transaction - that way
1840  * entries will just be marked as invalid without accessing the catalog.
1841  * That's advantageous because we don't need to setup the full state
1842  * necessary for catalog access.
1843  */
1844  if (use_subtxn)
1846 
1847  for (i = 0; i < ninvalidations; i++)
1848  LocalExecuteInvalidationMessage(&invalidations[i]);
1849 
1850  if (use_subtxn)
1852 }
void AbortCurrentTransaction(void)
Definition: xact.c:2984
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4320
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4155
void BeginInternalSubTransaction(char *name)
Definition: xact.c:4051
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:545
int i
static int ReorderBufferIterCompare ( Datum  a,
Datum  b,
void *  arg 
)
static

Definition at line 857 of file reorderbuffer.c.

References DatumGetInt32, ReorderBufferIterTXNState::entries, and ReorderBufferIterTXNEntry::lsn.

Referenced by ReorderBufferIterTXNInit().

858 {
860  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
861  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
862 
863  if (pos_a < pos_b)
864  return 1;
865  else if (pos_a == pos_b)
866  return 0;
867  return -1;
868 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
#define DatumGetInt32(X)
Definition: postgres.h:480
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:298
void * arg
static void ReorderBufferIterTXNFinish ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1084 of file reorderbuffer.c.

References Assert, binaryheap_free(), CloseTransientFile(), dlist_container, dlist_is_empty(), dlist_pop_head_node(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::fd, ReorderBufferIterTXNState::heap, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, pfree(), and ReorderBufferReturnChange().

Referenced by ReorderBufferCommit().

1086 {
1087  int32 off;
1088 
1089  for (off = 0; off < state->nr_txns; off++)
1090  {
1091  if (state->entries[off].fd != -1)
1092  CloseTransientFile(state->entries[off].fd);
1093  }
1094 
1095  /* free memory we might have "leaked" in the last *Next call */
1096  if (!dlist_is_empty(&state->old_change))
1097  {
1098  ReorderBufferChange *change;
1099 
1100  change = dlist_container(ReorderBufferChange, node,
1101  dlist_pop_head_node(&state->old_change));
1102  ReorderBufferReturnChange(rb, change);
1103  Assert(dlist_is_empty(&state->old_change));
1104  }
1105 
1106  binaryheap_free(state->heap);
1107  pfree(state);
1108 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
signed int int32
Definition: c.h:253
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:992
int CloseTransientFile(int fd)
Definition: fd.c:2254
#define Assert(condition)
Definition: c.h:671
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:69
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
static ReorderBufferIterTXNState * ReorderBufferIterTXNInit ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 875 of file reorderbuffer.c.

References binaryheap_add_unordered(), binaryheap_allocate(), binaryheap_build(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, ReorderBuffer::context, dlist_iter::cur, dlist_container, dlist_foreach, dlist_head_element, dlist_init(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::fd, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, MemoryContextAllocZero(), ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, ReorderBufferIterCompare(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferIterTXNEntry::segno, ReorderBufferTXN::subtxns, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferCommit().

876 {
877  Size nr_txns = 0;
879  dlist_iter cur_txn_i;
880  int32 off;
881 
882  /*
883  * Calculate the size of our heap: one element for every transaction that
884  * contains changes. (Besides the transactions already in the reorder
885  * buffer, we count the one we were directly passed.)
886  */
887  if (txn->nentries > 0)
888  nr_txns++;
889 
890  dlist_foreach(cur_txn_i, &txn->subtxns)
891  {
892  ReorderBufferTXN *cur_txn;
893 
894  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
895 
896  if (cur_txn->nentries > 0)
897  nr_txns++;
898  }
899 
900  /*
901  * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no
902  * need to allocate/build a heap then.
903  */
904 
905  /* allocate iteration state */
906  state = (ReorderBufferIterTXNState *)
908  sizeof(ReorderBufferIterTXNState) +
909  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
910 
911  state->nr_txns = nr_txns;
912  dlist_init(&state->old_change);
913 
914  for (off = 0; off < state->nr_txns; off++)
915  {
916  state->entries[off].fd = -1;
917  state->entries[off].segno = 0;
918  }
919 
920  /* allocate heap */
921  state->heap = binaryheap_allocate(state->nr_txns,
923  state);
924 
925  /*
926  * Now insert items into the binary heap, in an unordered fashion. (We
927  * will run a heap assembly step at the end; this is more efficient.)
928  */
929 
930  off = 0;
931 
932  /* add toplevel transaction if it contains changes */
933  if (txn->nentries > 0)
934  {
935  ReorderBufferChange *cur_change;
936 
937  if (txn->nentries != txn->nentries_mem)
938  {
939  /* serialize remaining changes */
940  ReorderBufferSerializeTXN(rb, txn);
941  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
942  &state->entries[off].segno);
943  }
944 
945  cur_change = dlist_head_element(ReorderBufferChange, node,
946  &txn->changes);
947 
948  state->entries[off].lsn = cur_change->lsn;
949  state->entries[off].change = cur_change;
950  state->entries[off].txn = txn;
951 
953  }
954 
955  /* add subtransactions if they contain changes */
956  dlist_foreach(cur_txn_i, &txn->subtxns)
957  {
958  ReorderBufferTXN *cur_txn;
959 
960  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
961 
962  if (cur_txn->nentries > 0)
963  {
964  ReorderBufferChange *cur_change;
965 
966  if (cur_txn->nentries != cur_txn->nentries_mem)
967  {
968  /* serialize remaining changes */
969  ReorderBufferSerializeTXN(rb, cur_txn);
970  ReorderBufferRestoreChanges(rb, cur_txn,
971  &state->entries[off].fd,
972  &state->entries[off].segno);
973  }
974  cur_change = dlist_head_element(ReorderBufferChange, node,
975  &cur_txn->changes);
976 
977  state->entries[off].lsn = cur_change->lsn;
978  state->entries[off].change = cur_change;
979  state->entries[off].txn = cur_txn;
980 
982  }
983  }
984 
985  /* assemble a valid binary heap */
986  binaryheap_build(state->heap);
987 
988  return state;
989 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
ReorderBufferTXN * txn
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, int *fd, XLogSegNo *segno)
void binaryheap_add_unordered(binaryheap *heap, Datum d)
Definition: binaryheap.c:110
signed int int32
Definition: c.h:253
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
ReorderBufferChange * change
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
MemoryContext context
dlist_node * cur
Definition: ilist.h:161
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:784
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:126
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
Definition: regguts.h:298
size_t Size
Definition: c.h:353
dlist_head subtxns
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:33
#define Int32GetDatum(X)
Definition: postgres.h:487
static ReorderBufferChange * ReorderBufferIterTXNNext ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 998 of file reorderbuffer.c.

References Assert, binaryheap::bh_size, binaryheap_first(), binaryheap_remove_first(), binaryheap_replace_first(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, DatumGetInt32, DEBUG2, dlist_container, dlist_delete(), dlist_has_next(), dlist_head_element, dlist_is_empty(), dlist_next_node(), dlist_pop_head_node(), dlist_push_tail(), elog, ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::fd, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, next, ReorderBufferChange::node, NULL, ReorderBufferIterTXNState::old_change, ReorderBufferRestoreChanges(), ReorderBufferReturnChange(), ReorderBufferIterTXNEntry::segno, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferCommit().

999 {
1000  ReorderBufferChange *change;
1002  int32 off;
1003 
1004  /* nothing there anymore */
1005  if (state->heap->bh_size == 0)
1006  return NULL;
1007 
1008  off = DatumGetInt32(binaryheap_first(state->heap));
1009  entry = &state->entries[off];
1010 
1011  /* free memory we might have "leaked" in the previous *Next call */
1012  if (!dlist_is_empty(&state->old_change))
1013  {
1014  change = dlist_container(ReorderBufferChange, node,
1015  dlist_pop_head_node(&state->old_change));
1016  ReorderBufferReturnChange(rb, change);
1017  Assert(dlist_is_empty(&state->old_change));
1018  }
1019 
1020  change = entry->change;
1021 
1022  /*
1023  * update heap with information about which transaction has the next
1024  * relevant change in LSN order
1025  */
1026 
1027  /* there are in-memory changes */
1028  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1029  {
1030  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1031  ReorderBufferChange *next_change =
1032  dlist_container(ReorderBufferChange, node, next);
1033 
1034  /* txn stays the same */
1035  state->entries[off].lsn = next_change->lsn;
1036  state->entries[off].change = next_change;
1037 
1039  return change;
1040  }
1041 
1042  /* try to load changes from disk */
1043  if (entry->txn->nentries != entry->txn->nentries_mem)
1044  {
1045  /*
1046  * Ugly: restoring changes will reuse *Change records, thus delete the
1047  * current one from the per-tx list and only free in the next call.
1048  */
1049  dlist_delete(&change->node);
1050  dlist_push_tail(&state->old_change, &change->node);
1051 
1052  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
1053  &state->entries[off].segno))
1054  {
1055  /* successfully restored changes from disk */
1056  ReorderBufferChange *next_change =
1058  &entry->txn->changes);
1059 
1060  elog(DEBUG2, "restored %u/%u changes from disk",
1061  (uint32) entry->txn->nentries_mem,
1062  (uint32) entry->txn->nentries);
1063 
1064  Assert(entry->txn->nentries_mem);
1065  /* txn stays the same */
1066  state->entries[off].lsn = next_change->lsn;
1067  state->entries[off].change = next_change;
1069 
1070  return change;
1071  }
1072  }
1073 
1074  /* ok, no changes there anymore, remove */
1075  binaryheap_remove_first(state->heap);
1076 
1077  return change;
1078 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
static int32 next
Definition: blutils.c:210
#define DatumGetInt32(X)
Definition: postgres.h:480
ReorderBufferTXN * txn
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void binaryheap_replace_first(binaryheap *heap, Datum d)
Definition: binaryheap.c:204
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, int *fd, XLogSegNo *segno)
signed int int32
Definition: c.h:253
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:421
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
Datum binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:159
#define DEBUG2
Definition: elog.h:24
unsigned int uint32
Definition: c.h:265
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
int bh_size
Definition: binaryheap.h:32
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
ReorderBufferChange * change
static bool dlist_has_next(dlist_head *head, dlist_node *node)
Definition: ilist.h:402
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define Int32GetDatum(X)
Definition: postgres.h:487
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
Datum binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:174
#define elog
Definition: elog.h:219
void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1865 of file reorderbuffer.c.

References InvalidTransactionId, NULL, and ReorderBufferTXNByXid().

Referenced by DecodeHeap2Op(), DecodeHeapOp(), DecodeLogicalMsgOp(), DecodeStandbyOp(), DecodeXactOp(), DecodeXLogOp(), and LogicalDecodingProcessRecord().

1866 {
1867  /* many records won't have an xid assigned, centralize check here */
1868  if (xid != InvalidTransactionId)
1869  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1870 }
#define InvalidTransactionId
Definition: transam.h:31
#define NULL
Definition: c.h:226
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change 
)

Definition at line 620 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, NULL, ReorderBufferCheckSerializeTXN(), and ReorderBufferTXNByXid().

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

622 {
623  ReorderBufferTXN *txn;
624 
625  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
626 
627  change->lsn = lsn;
628  Assert(InvalidXLogRecPtr != lsn);
629  dlist_push_tail(&txn->changes, &change->node);
630  txn->nentries++;
631  txn->nentries_mem++;
632 
634 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
dlist_head changes
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snapshot,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 640 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, NULL, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), and TeardownHistoricSnapshot().

Referenced by DecodeLogicalMsgOp().

644 {
645  if (transactional)
646  {
647  MemoryContext oldcontext;
648  ReorderBufferChange *change;
649 
651 
652  oldcontext = MemoryContextSwitchTo(rb->context);
653 
654  change = ReorderBufferGetChange(rb);
656  change->data.msg.prefix = pstrdup(prefix);
657  change->data.msg.message_size = message_size;
658  change->data.msg.message = palloc(message_size);
659  memcpy(change->data.msg.message, message, message_size);
660 
661  ReorderBufferQueueChange(rb, xid, lsn, change);
662 
663  MemoryContextSwitchTo(oldcontext);
664  }
665  else
666  {
667  ReorderBufferTXN *txn = NULL;
668  volatile Snapshot snapshot_now = snapshot;
669 
670  if (xid != InvalidTransactionId)
671  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
672 
673  /* setup snapshot to allow catalog access */
674  SetupHistoricSnapshot(snapshot_now, NULL);
675  PG_TRY();
676  {
677  rb->message(rb, txn, lsn, false, prefix, message_size, message);
678 
680  }
681  PG_CATCH();
682  {
684  PG_RE_THROW();
685  }
686  PG_END_TRY();
687  }
688 }
char * pstrdup(const char *in)
Definition: mcxt.c:1165
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
union ReorderBufferChange::@49 data
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:1962
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
struct ReorderBufferChange::@49::@51 msg
#define InvalidTransactionId
Definition: transam.h:31
ReorderBufferMessageCB message
MemoryContext context
#define PG_CATCH()
Definition: elog.h:293
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
#define PG_RE_THROW()
Definition: elog.h:314
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:891
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:1946
#define PG_TRY()
Definition: elog.h:284
#define PG_END_TRY()
Definition: elog.h:300
static void ReorderBufferRestoreChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
char *  change 
)
static

Definition at line 2472 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, ReorderBufferTXN::changes, ReorderBuffer::context, SnapshotData::copied, ReorderBufferChange::data, dlist_push_tail(), MemoryContextAlloc(), MemoryContextAllocZero(), ReorderBufferChange::msg, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, offsetof, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferGetChange(), ReorderBufferGetTupleBuf(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, ReorderBufferChange::tp, SnapshotData::xcnt, and SnapshotData::xip.

Referenced by ReorderBufferRestoreChanges().

2474 {
2475  ReorderBufferDiskChange *ondisk;
2476  ReorderBufferChange *change;
2477 
2478  ondisk = (ReorderBufferDiskChange *) data;
2479 
2480  change = ReorderBufferGetChange(rb);
2481 
2482  /* copy static part */
2483  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
2484 
2485  data += sizeof(ReorderBufferDiskChange);
2486 
2487  /* restore individual stuff */
2488  switch (change->action)
2489  {
2490  /* fall through these, they're all similar enough */
2495  if (change->data.tp.oldtuple)
2496  {
2497  uint32 tuplelen = ((HeapTuple) data)->t_len;
2498 
2499  change->data.tp.oldtuple =
2501 
2502  /* restore ->tuple */
2503  memcpy(&change->data.tp.oldtuple->tuple, data,
2504  sizeof(HeapTupleData));
2505  data += sizeof(HeapTupleData);
2506 
2507  /* reset t_data pointer into the new tuplebuf */
2508  change->data.tp.oldtuple->tuple.t_data =
2509  ReorderBufferTupleBufData(change->data.tp.oldtuple);
2510 
2511  /* restore tuple data itself */
2512  memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
2513  data += tuplelen;
2514  }
2515 
2516  if (change->data.tp.newtuple)
2517  {
2518  /* here, data might not be suitably aligned! */
2519  uint32 tuplelen;
2520 
2521  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
2522  sizeof(uint32));
2523 
2524  change->data.tp.newtuple =
2526 
2527  /* restore ->tuple */
2528  memcpy(&change->data.tp.newtuple->tuple, data,
2529  sizeof(HeapTupleData));
2530  data += sizeof(HeapTupleData);
2531 
2532  /* reset t_data pointer into the new tuplebuf */
2533  change->data.tp.newtuple->tuple.t_data =
2534  ReorderBufferTupleBufData(change->data.tp.newtuple);
2535 
2536  /* restore tuple data itself */
2537  memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
2538  data += tuplelen;
2539  }
2540 
2541  break;
2543  {
2544  Size prefix_size;
2545 
2546  /* read prefix */
2547  memcpy(&prefix_size, data, sizeof(Size));
2548  data += sizeof(Size);
2549  change->data.msg.prefix = MemoryContextAlloc(rb->context,
2550  prefix_size);
2551  memcpy(change->data.msg.prefix, data, prefix_size);
2552  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
2553  data += prefix_size;
2554 
2555  /* read the message */
2556  memcpy(&change->data.msg.message_size, data, sizeof(Size));
2557  data += sizeof(Size);
2558  change->data.msg.message = MemoryContextAlloc(rb->context,
2559  change->data.msg.message_size);
2560  memcpy(change->data.msg.message, data,
2561  change->data.msg.message_size);
2562  data += change->data.msg.message_size;
2563 
2564  break;
2565  }
2567  {
2568  Snapshot oldsnap;
2569  Snapshot newsnap;
2570  Size size;
2571 
2572  oldsnap = (Snapshot) data;
2573 
2574  size = sizeof(SnapshotData) +
2575  sizeof(TransactionId) * oldsnap->xcnt +
2576  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
2577 
2578  change->data.snapshot = MemoryContextAllocZero(rb->context, size);
2579 
2580  newsnap = change->data.snapshot;
2581 
2582  memcpy(newsnap, data, size);
2583  newsnap->xip = (TransactionId *)
2584  (((char *) newsnap) + sizeof(SnapshotData));
2585  newsnap->subxip = newsnap->xip + newsnap->xcnt;
2586  newsnap->copied = true;
2587  break;
2588  }
2589  /* the base struct contains all the data, easy peasy */
2593  break;
2594  }
2595 
2596  dlist_push_tail(&txn->changes, &change->node);
2597  txn->nentries_mem++;
2598 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:170
HeapTupleData * HeapTuple
Definition: htup.h:70
uint32 TransactionId
Definition: c.h:394
bool copied
Definition: snapshot.h:93
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
union ReorderBufferChange::@49 data
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
struct SnapshotData * Snapshot
Definition: snapshot.h:22
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
struct ReorderBufferChange::@49::@51 msg
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:36
struct ReorderBufferChange::@49::@50 tp
dlist_head changes
struct SnapshotData SnapshotData
unsigned int uint32
Definition: c.h:265
TransactionId * xip
Definition: snapshot.h:76
MemoryContext context
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:784
struct ReorderBufferDiskChange ReorderBufferDiskChange
#define Assert(condition)
Definition: c.h:671
size_t Size
Definition: c.h:353
uint32 xcnt
Definition: snapshot.h:77
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:749
ReorderBufferChange change
struct HeapTupleData HeapTupleData
#define offsetof(type, field)
Definition: c.h:551
TransactionId * subxip
Definition: snapshot.h:88
int32 subxcnt
Definition: snapshot.h:89
ReorderBufferTupleBuf * ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
static Size ReorderBufferRestoreChanges ( ReorderBuffer rb,
ReorderBufferTXN txn,
int *  fd,
XLogSegNo segno 
)
static

Definition at line 2339 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, cleanup(), CloseTransientFile(), dlist_mutable_iter::cur, ReplicationSlot::data, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, max_changes_in_memory, MAXPGPATH, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, OpenTransientFile(), ReorderBuffer::outbuf, PG_BINARY, read, ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, ReorderBufferTXN::xid, XLByteToSeg, and XLogSegNoOffsetToRecPtr.

Referenced by ReorderBufferIterTXNInit(), and ReorderBufferIterTXNNext().

2341 {
2342  Size restored = 0;
2343  XLogSegNo last_segno;
2344  dlist_mutable_iter cleanup_iter;
2345 
2348 
2349  /* free current entries, so we have memory for more */
2350  dlist_foreach_modify(cleanup_iter, &txn->changes)
2351  {
2353  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
2354 
2355  dlist_delete(&cleanup->node);
2356  ReorderBufferReturnChange(rb, cleanup);
2357  }
2358  txn->nentries_mem = 0;
2359  Assert(dlist_is_empty(&txn->changes));
2360 
2361  XLByteToSeg(txn->final_lsn, last_segno);
2362 
2363  while (restored < max_changes_in_memory && *segno <= last_segno)
2364  {
2365  int readBytes;
2366  ReorderBufferDiskChange *ondisk;
2367 
2368  if (*fd == -1)
2369  {
2370  XLogRecPtr recptr;
2371  char path[MAXPGPATH];
2372 
2373  /* first time in */
2374  if (*segno == 0)
2375  {
2376  XLByteToSeg(txn->first_lsn, *segno);
2377  }
2378 
2379  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2380  XLogSegNoOffsetToRecPtr(*segno, 0, recptr);
2381 
2382  /*
2383  * No need to care about TLIs here, only used during a single run,
2384  * so each LSN only maps to a specific WAL record.
2385  */
2386  sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2388  (uint32) (recptr >> 32), (uint32) recptr);
2389 
2390  *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
2391  if (*fd < 0 && errno == ENOENT)
2392  {
2393  *fd = -1;
2394  (*segno)++;
2395  continue;
2396  }
2397  else if (*fd < 0)
2398  ereport(ERROR,
2400  errmsg("could not open file \"%s\": %m",
2401  path)));
2402 
2403  }
2404 
2405  /*
2406  * Read the statically sized part of a change which has information
2407  * about the total size. If we couldn't read a record, we're at the
2408  * end of this file.
2409  */
2411  readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
2412 
2413  /* eof */
2414  if (readBytes == 0)
2415  {
2417  *fd = -1;
2418  (*segno)++;
2419  continue;
2420  }
2421  else if (readBytes < 0)
2422  ereport(ERROR,
2424  errmsg("could not read from reorderbuffer spill file: %m")));
2425  else if (readBytes != sizeof(ReorderBufferDiskChange))
2426  ereport(ERROR,
2428  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2429  readBytes,
2430  (uint32) sizeof(ReorderBufferDiskChange))));
2431 
2432  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2433 
2435  sizeof(ReorderBufferDiskChange) + ondisk->size);
2436  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2437 
2438  readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
2439  ondisk->size - sizeof(ReorderBufferDiskChange));
2440 
2441  if (readBytes < 0)
2442  ereport(ERROR,
2444  errmsg("could not read from reorderbuffer spill file: %m")));
2445  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
2446  ereport(ERROR,
2448  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2449  readBytes,
2450  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
2451 
2452  /*
2453  * ok, read a full change from disk, now restore it into proper
2454  * in-memory format
2455  */
2456  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
2457  restored++;
2458  }
2459 
2460  return restored;
2461 }
XLogRecPtr first_lsn
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
ReplicationSlotPersistentData data
Definition: slot.h:115
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1038
#define XLogSegNoOffsetToRecPtr(segno, offset, dest)
Definition: xlog_internal.h:95
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define ERROR
Definition: elog.h:43
dlist_head changes
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:34
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2093
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:265
XLogRecPtr final_lsn
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define ereport(elevel, rest)
Definition: elog.h:122
int CloseTransientFile(int fd)
Definition: fd.c:2254
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
static void cleanup(void)
Definition: bootstrap.c:848
TransactionId xid
struct ReorderBufferDiskChange ReorderBufferDiskChange
ReplicationSlot * MyReplicationSlot
Definition: slot.c:95
#define XLByteToSeg(xlrp, logSegNo)
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:671
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
size_t Size
Definition: c.h:353
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define NameStr(name)
Definition: c.h:495
static const Size max_changes_in_memory
#define read(a, b, c)
Definition: win32.h:18
static void ReorderBufferRestoreCleanup ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2604 of file reorderbuffer.c.

References Assert, cur, ReplicationSlot::data, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, MAXPGPATH, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, unlink(), ReorderBufferTXN::xid, XLByteToSeg, and XLogSegNoOffsetToRecPtr.

Referenced by ReorderBufferCleanupTXN().

2605 {
2606  XLogSegNo first;
2607  XLogSegNo cur;
2608  XLogSegNo last;
2609 
2612 
2613  XLByteToSeg(txn->first_lsn, first);
2614  XLByteToSeg(txn->final_lsn, last);
2615 
2616  /* iterate over all possible filenames, and delete them */
2617  for (cur = first; cur <= last; cur++)
2618  {
2619  char path[MAXPGPATH];
2620  XLogRecPtr recptr;
2621 
2622  XLogSegNoOffsetToRecPtr(cur, 0, recptr);
2623 
2624  sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2626  (uint32) (recptr >> 32), (uint32) recptr);
2627  if (unlink(path) != 0 && errno != ENOENT)
2628  ereport(ERROR,
2630  errmsg("could not remove file \"%s\": %m", path)));
2631  }
2632 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
struct cursor * cur
Definition: ecpg.c:28
ReplicationSlotPersistentData data
Definition: slot.h:115
#define XLogSegNoOffsetToRecPtr(segno, offset, dest)
Definition: xlog_internal.h:95
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:34
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:265
XLogRecPtr final_lsn
int unlink(const char *filename)
#define ereport(elevel, rest)
Definition: elog.h:122
TransactionId xid
ReplicationSlot * MyReplicationSlot
Definition: slot.c:95
#define XLByteToSeg(xlrp, logSegNo)
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:671
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define NameStr(name)
Definition: c.h:495
void ReorderBufferReturnChange ( ReorderBuffer rb,
ReorderBufferChange change 
)

Definition at line 394 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBuffer::cached_changes, ReorderBufferChange::data, dlist_push_head(), max_cached_changes, ReorderBufferChange::msg, ReorderBufferChange::node, ReorderBuffer::nr_cached_changes, NULL, pfree(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferFreeSnap(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, ReorderBufferChange::tp, VALGRIND_MAKE_MEM_DEFINED, and VALGRIND_MAKE_MEM_UNDEFINED.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferCommit(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), and ReorderBufferToastReset().

395 {
396  /* free contained data */
397  switch (change->action)
398  {
403  if (change->data.tp.newtuple)
404  {
405  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
406  change->data.tp.newtuple = NULL;
407  }
408 
409  if (change->data.tp.oldtuple)
410  {
411  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
412  change->data.tp.oldtuple = NULL;
413  }
414  break;
416  if (change->data.msg.prefix != NULL)
417  pfree(change->data.msg.prefix);
418  change->data.msg.prefix = NULL;
419  if (change->data.msg.message != NULL)
420  pfree(change->data.msg.message);
421  change->data.msg.message = NULL;
422  break;
424  if (change->data.snapshot)
425  {
426  ReorderBufferFreeSnap(rb, change->data.snapshot);
427  change->data.snapshot = NULL;
428  }
429  break;
430  /* no data in addition to the struct itself */
434  break;
435  }
436 
437  /* check whether to put into the slab cache */
439  {
440  rb->nr_cached_changes++;
441  dlist_push_head(&rb->cached_changes, &change->node);
443  VALGRIND_MAKE_MEM_DEFINED(&change->node, sizeof(change->node));
444  }
445  else
446  {
447  pfree(change);
448  }
449 }
dlist_head cached_changes
#define VALGRIND_MAKE_MEM_DEFINED(addr, size)
Definition: memdebug.h:26
static void dlist_push_head(dlist_head *head, dlist_node *node)
Definition: ilist.h:300
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
Definition: memdebug.h:28
union ReorderBufferChange::@49 data
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
struct ReorderBufferChange::@49::@51 msg
void pfree(void *pointer)
Definition: mcxt.c:992
struct ReorderBufferChange::@49::@50 tp
Size nr_cached_changes
static const Size max_cached_changes
#define NULL
Definition: c.h:226
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
void ReorderBufferReturnTupleBuf ( ReorderBuffer rb,
ReorderBufferTupleBuf tuple 
)

Definition at line 511 of file reorderbuffer.c.

References ReorderBufferTupleBuf::alloc_tuple_size, ReorderBuffer::cached_tuplebufs, max_cached_tuplebufs, MaxHeapTupleSize, ReorderBufferTupleBuf::node, ReorderBuffer::nr_cached_tuplebufs, pfree(), slist_push_head(), HeapTupleData::t_data, ReorderBufferTupleBuf::tuple, VALGRIND_MAKE_MEM_DEFINED, and VALGRIND_MAKE_MEM_UNDEFINED.

Referenced by ReorderBufferReturnChange().

512 {
513  /* check whether to put into the slab cache, oversized tuples never are */
514  if (tuple->alloc_tuple_size == MaxHeapTupleSize &&
516  {
517  rb->nr_cached_tuplebufs++;
518  slist_push_head(&rb->cached_tuplebufs, &tuple->node);
521  VALGRIND_MAKE_MEM_DEFINED(&tuple->node, sizeof(tuple->node));
523  }
524  else
525  {
526  pfree(tuple);
527  }
528 }
#define VALGRIND_MAKE_MEM_DEFINED(addr, size)
Definition: memdebug.h:26
static const Size max_cached_tuplebufs
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
Definition: memdebug.h:28
static void slist_push_head(slist_head *head, slist_node *node)
Definition: ilist.h:574
HeapTupleHeader t_data
Definition: htup.h:67
void pfree(void *pointer)
Definition: mcxt.c:992
#define MaxHeapTupleSize
Definition: htup_details.h:561
HeapTupleData tuple
Definition: reorderbuffer.h:27
Size nr_cached_tuplebufs
slist_head cached_tuplebufs
static void ReorderBufferReturnTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 324 of file reorderbuffer.c.

References ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::cached_transactions, dlist_push_head(), hash_destroy(), ReorderBufferTXN::invalidations, InvalidTransactionId, max_cached_transactions, ReorderBufferTXN::node, ReorderBuffer::nr_cached_transactions, NULL, pfree(), ReorderBufferTXN::tuplecid_hash, VALGRIND_MAKE_MEM_DEFINED, VALGRIND_MAKE_MEM_UNDEFINED, and ReorderBufferTXN::xid.

Referenced by ReorderBufferCleanupTXN().

325 {
326  /* clean the lookup cache if we were cached (quite likely) */
327  if (rb->by_txn_last_xid == txn->xid)
328  {
330  rb->by_txn_last_txn = NULL;
331  }
332 
333  /* free data that's contained */
334 
335  if (txn->tuplecid_hash != NULL)
336  {
338  txn->tuplecid_hash = NULL;
339  }
340 
341  if (txn->invalidations)
342  {
343  pfree(txn->invalidations);
344  txn->invalidations = NULL;
345  }
346 
347  /* check whether to put into the slab cache */
349  {
353  VALGRIND_MAKE_MEM_DEFINED(&txn->node, sizeof(txn->node));
354  }
355  else
356  {
357  pfree(txn);
358  }
359 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:793
#define VALGRIND_MAKE_MEM_DEFINED(addr, size)
Definition: memdebug.h:26
static void dlist_push_head(dlist_head *head, dlist_node *node)
Definition: ilist.h:300
TransactionId by_txn_last_xid
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
Definition: memdebug.h:28
void pfree(void *pointer)
Definition: mcxt.c:992
Size nr_cached_transactions
dlist_head cached_transactions
#define InvalidTransactionId
Definition: transam.h:31
static const Size max_cached_transactions
ReorderBufferTXN * by_txn_last_txn
TransactionId xid
#define NULL
Definition: c.h:226
SharedInvalidationMessage * invalidations
static void ReorderBufferSerializeChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  fd,
ReorderBufferChange change 
)
static

Definition at line 2177 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, CloseTransientFile(), ReorderBufferChange::data, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferChange::msg, ReorderBuffer::outbuf, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferTupleBuf::tuple, write, SnapshotData::xcnt, ReorderBufferTXN::xid, and SnapshotData::xip.

Referenced by ReorderBufferSerializeTXN().

2179 {
2180  ReorderBufferDiskChange *ondisk;
2181  Size sz = sizeof(ReorderBufferDiskChange);
2182 
2184 
2185  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2186  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
2187 
2188  switch (change->action)
2189  {
2190  /* fall through these, they're all similar enough */
2195  {
2196  char *data;
2197  ReorderBufferTupleBuf *oldtup,
2198  *newtup;
2199  Size oldlen = 0;
2200  Size newlen = 0;
2201 
2202  oldtup = change->data.tp.oldtuple;
2203  newtup = change->data.tp.newtuple;
2204 
2205  if (oldtup)
2206  {
2207  sz += sizeof(HeapTupleData);
2208  oldlen = oldtup->tuple.t_len;
2209  sz += oldlen;
2210  }
2211 
2212  if (newtup)
2213  {
2214  sz += sizeof(HeapTupleData);
2215  newlen = newtup->tuple.t_len;
2216  sz += newlen;
2217  }
2218 
2219  /* make sure we have enough space */
2221 
2222  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2223  /* might have been reallocated above */
2224  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2225 
2226  if (oldlen)
2227  {
2228  memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
2229  data += sizeof(HeapTupleData);
2230 
2231  memcpy(data, oldtup->tuple.t_data, oldlen);
2232  data += oldlen;
2233  }
2234 
2235  if (newlen)
2236  {
2237  memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
2238  data += sizeof(HeapTupleData);
2239 
2240  memcpy(data, newtup->tuple.t_data, newlen);
2241  data += newlen;
2242  }
2243  break;
2244  }
2246  {
2247  char *data;
2248  Size prefix_size = strlen(change->data.msg.prefix) + 1;
2249 
2250  sz += prefix_size + change->data.msg.message_size +
2251  sizeof(Size) + sizeof(Size);
2253 
2254  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2255 
2256  /* might have been reallocated above */
2257  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2258 
2259  /* write the prefix including the size */
2260  memcpy(data, &prefix_size, sizeof(Size));
2261  data += sizeof(Size);
2262  memcpy(data, change->data.msg.prefix,
2263  prefix_size);
2264  data += prefix_size;
2265 
2266  /* write the message including the size */
2267  memcpy(data, &change->data.msg.message_size, sizeof(Size));
2268  data += sizeof(Size);
2269  memcpy(data, change->data.msg.message,
2270  change->data.msg.message_size);
2271  data += change->data.msg.message_size;
2272 
2273  break;
2274  }
2276  {
2277  Snapshot snap;
2278  char *data;
2279 
2280  snap = change->data.snapshot;
2281 
2282  sz += sizeof(SnapshotData) +
2283  sizeof(TransactionId) * snap->xcnt +
2284  sizeof(TransactionId) * snap->subxcnt
2285  ;
2286 
2287  /* make sure we have enough space */
2289  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2290  /* might have been reallocated above */
2291  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2292 
2293  memcpy(data, snap, sizeof(SnapshotData));
2294  data += sizeof(SnapshotData);
2295 
2296  if (snap->xcnt)
2297  {
2298  memcpy(data, snap->xip,
2299  sizeof(TransactionId) * snap->xcnt);
2300  data += sizeof(TransactionId) * snap->xcnt;
2301  }
2302 
2303  if (snap->subxcnt)
2304  {
2305  memcpy(data, snap->subxip,
2306  sizeof(TransactionId) * snap->subxcnt);
2307  data += sizeof(TransactionId) * snap->subxcnt;
2308  }
2309  break;
2310  }
2314  /* ReorderBufferChange contains everything important */
2315  break;
2316  }
2317 
2318  ondisk->size = sz;
2319 
2320  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
2321  {
2322  int save_errno = errno;
2323 
2325  errno = save_errno;
2326  ereport(ERROR,
2328  errmsg("could not write to data file for XID %u: %m",
2329  txn->xid)));
2330  }
2331 
2332  Assert(ondisk->change.action == change->action);
2333 }
uint32 TransactionId
Definition: c.h:394
#define write(a, b, c)
Definition: win32.h:19
union ReorderBufferChange::@49 data
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
static int fd(const char *x, int i)
Definition: preproc-init.c:105
HeapTupleHeader t_data
Definition: htup.h:67
struct ReorderBufferChange::@49::@51 msg
struct ReorderBufferChange::@49::@50 tp
#define ERROR
Definition: elog.h:43
uint32 t_len
Definition: htup.h:64
int errcode_for_file_access(void)
Definition: elog.c:598
HeapTupleData tuple
Definition: reorderbuffer.h:27
struct SnapshotData SnapshotData
#define ereport(elevel, rest)
Definition: elog.h:122
TransactionId * xip
Definition: snapshot.h:76
int CloseTransientFile(int fd)
Definition: fd.c:2254
TransactionId xid
struct ReorderBufferDiskChange ReorderBufferDiskChange
#define Assert(condition)
Definition: c.h:671
size_t Size
Definition: c.h:353
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
uint32 xcnt
Definition: snapshot.h:77
int errmsg(const char *fmt,...)
Definition: elog.c:797
ReorderBufferChange change
struct HeapTupleData HeapTupleData
TransactionId * subxip
Definition: snapshot.h:88
int32 subxcnt
Definition: snapshot.h:89
static void ReorderBufferSerializeReserve ( ReorderBuffer rb,
Size  sz 
)
static

Definition at line 2061 of file reorderbuffer.c.

References ReorderBuffer::context, MemoryContextAlloc(), ReorderBuffer::outbuf, ReorderBuffer::outbufsize, and repalloc().

Referenced by ReorderBufferRestoreChanges(), and ReorderBufferSerializeChange().

2062 {
2063  if (!rb->outbufsize)
2064  {
2065  rb->outbuf = MemoryContextAlloc(rb->context, sz);
2066  rb->outbufsize = sz;
2067  }
2068  else if (rb->outbufsize < sz)
2069  {
2070  rb->outbuf = repalloc(rb->outbuf, sz);
2071  rb->outbufsize = sz;
2072  }
2073 }
MemoryContext context
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1021
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:749
static void ReorderBufferSerializeTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2096 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, CloseTransientFile(), dlist_iter::cur, dlist_mutable_iter::cur, ReplicationSlot::data, DEBUG2, dlist_container, dlist_delete(), dlist_foreach, dlist_foreach_modify, dlist_is_empty(), elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferChange::lsn, MAXPGPATH, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, OpenTransientFile(), PG_BINARY, ReorderBufferReturnChange(), ReorderBufferSerializeChange(), ReorderBufferTXN::subtxns, ReorderBufferTXN::xid, XLByteInSeg, XLByteToSeg, and XLogSegNoOffsetToRecPtr.

Referenced by ReorderBufferCheckSerializeTXN(), and ReorderBufferIterTXNInit().

2097 {
2098  dlist_iter subtxn_i;
2099  dlist_mutable_iter change_i;
2100  int fd = -1;
2101  XLogSegNo curOpenSegNo = 0;
2102  Size spilled = 0;
2103  char path[MAXPGPATH];
2104 
2105  elog(DEBUG2, "spill %u changes in XID %u to disk",
2106  (uint32) txn->nentries_mem, txn->xid);
2107 
2108  /* do the same to all child TXs */
2109  dlist_foreach(subtxn_i, &txn->subtxns)
2110  {
2111  ReorderBufferTXN *subtxn;
2112 
2113  subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
2114  ReorderBufferSerializeTXN(rb, subtxn);
2115  }
2116 
2117  /* serialize changestream */
2118  dlist_foreach_modify(change_i, &txn->changes)
2119  {
2120  ReorderBufferChange *change;
2121 
2122  change = dlist_container(ReorderBufferChange, node, change_i.cur);
2123 
2124  /*
2125  * store in segment in which it belongs by start lsn, don't split over
2126  * multiple segments tho
2127  */
2128  if (fd == -1 || !XLByteInSeg(change->lsn, curOpenSegNo))
2129  {
2130  XLogRecPtr recptr;
2131 
2132  if (fd != -1)
2133  CloseTransientFile(fd);
2134 
2135  XLByteToSeg(change->lsn, curOpenSegNo);
2136  XLogSegNoOffsetToRecPtr(curOpenSegNo, 0, recptr);
2137 
2138  /*
2139  * No need to care about TLIs here, only used during a single run,
2140  * so each LSN only maps to a specific WAL record.
2141  */
2142  sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2144  (uint32) (recptr >> 32), (uint32) recptr);
2145 
2146  /* open segment, create it if necessary */
2147  fd = OpenTransientFile(path,
2148  O_CREAT | O_WRONLY | O_APPEND | PG_BINARY,
2149  S_IRUSR | S_IWUSR);
2150 
2151  if (fd < 0)
2152  ereport(ERROR,
2154  errmsg("could not open file \"%s\": %m",
2155  path)));
2156  }
2157 
2158  ReorderBufferSerializeChange(rb, txn, fd, change);
2159  dlist_delete(&change->node);
2160  ReorderBufferReturnChange(rb, change);
2161 
2162  spilled++;
2163  }
2164 
2165  Assert(spilled == txn->nentries_mem);
2166  Assert(dlist_is_empty(&txn->changes));
2167  txn->nentries_mem = 0;
2168 
2169  if (fd != -1)
2170  CloseTransientFile(fd);
2171 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
ReplicationSlotPersistentData data
Definition: slot.h:115
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1038
#define XLogSegNoOffsetToRecPtr(segno, offset, dest)
Definition: xlog_internal.h:95
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define ERROR
Definition: elog.h:43
dlist_head changes
#define MAXPGPATH
#define DEBUG2
Definition: elog.h:24
uint64 XLogSegNo
Definition: xlogdefs.h:34
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2093
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:265
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define ereport(elevel, rest)
Definition: elog.h:122
int CloseTransientFile(int fd)
Definition: fd.c:2254
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
dlist_node * cur
Definition: ilist.h:161
ReplicationSlot * MyReplicationSlot
Definition: slot.c:95
#define XLByteToSeg(xlrp, logSegNo)
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:671
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
size_t Size
Definition: c.h:353
dlist_head subtxns
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define NameStr(name)
Definition: c.h:495
#define elog
Definition: elog.h:219
#define XLByteInSeg(xlrp, logSegNo)
void ReorderBufferSetBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 1898 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, NULL, and ReorderBufferTXNByXid().

Referenced by SnapBuildCommitTxn(), and SnapBuildProcessChange().

1900 {
1901  ReorderBufferTXN *txn;
1902  bool is_new;
1903 
1904  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
1905  Assert(txn->base_snapshot == NULL);
1906  Assert(snap != NULL);
1907 
1908  txn->base_snapshot = snap;
1909  txn->base_snapshot_lsn = lsn;
1910 }
Snapshot base_snapshot
XLogRecPtr base_snapshot_lsn
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferSetRestartPoint ( ReorderBuffer rb,
XLogRecPtr  ptr 
)

Definition at line 735 of file reorderbuffer.c.

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

736 {
738 }
XLogRecPtr current_restart_decoding_lsn
static void ReorderBufferToastAppendChunk ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 2726 of file reorderbuffer.c.

References Assert, ReorderBufferToastEnt::chunk_id, ReorderBufferToastEnt::chunks, ReorderBufferChange::data, DatumGetInt32, DatumGetObjectId, DatumGetPointer, dlist_init(), dlist_push_tail(), elog, ERROR, fastgetattr, HASH_ENTER, hash_search(), IsToastRelation(), ReorderBufferToastEnt::last_chunk_seq, ReorderBufferChange::node, NULL, ReorderBufferToastEnt::num_chunks, ReorderBufferToastEnt::reconstructed, RelationGetDescr, ReorderBufferToastInitHash(), ReorderBufferToastEnt::size, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, ReorderBufferTupleBuf::tuple, VARATT_IS_EXTENDED, VARATT_IS_SHORT, VARHDRSZ, VARHDRSZ_SHORT, VARSIZE, and VARSIZE_SHORT.

Referenced by ReorderBufferCommit().

2728 {
2729  ReorderBufferToastEnt *ent;
2730  ReorderBufferTupleBuf *newtup;
2731  bool found;
2732  int32 chunksize;
2733  bool isnull;
2734  Pointer chunk;
2735  TupleDesc desc = RelationGetDescr(relation);
2736  Oid chunk_id;
2737  int32 chunk_seq;
2738 
2739  if (txn->toast_hash == NULL)
2740  ReorderBufferToastInitHash(rb, txn);
2741 
2742  Assert(IsToastRelation(relation));
2743 
2744  newtup = change->data.tp.newtuple;
2745  chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
2746  Assert(!isnull);
2747  chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
2748  Assert(!isnull);
2749 
2750  ent = (ReorderBufferToastEnt *)
2751  hash_search(txn->toast_hash,
2752  (void *) &chunk_id,
2753  HASH_ENTER,
2754  &found);
2755 
2756  if (!found)
2757  {
2758  Assert(ent->chunk_id == chunk_id);
2759  ent->num_chunks = 0;
2760  ent->last_chunk_seq = 0;
2761  ent->size = 0;
2762  ent->reconstructed = NULL;
2763  dlist_init(&ent->chunks);
2764 
2765  if (chunk_seq != 0)
2766  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
2767  chunk_seq, chunk_id);
2768  }
2769  else if (found && chunk_seq != ent->last_chunk_seq + 1)
2770  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
2771  chunk_seq, chunk_id, ent->last_chunk_seq + 1);
2772 
2773  chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
2774  Assert(!isnull);
2775 
2776  /* calculate size so we can allocate the right size at once later */
2777  if (!VARATT_IS_EXTENDED(chunk))
2778  chunksize = VARSIZE(chunk) - VARHDRSZ;
2779  else if (VARATT_IS_SHORT(chunk))
2780  /* could happen due to heap_form_tuple doing its thing */
2781  chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
2782  else
2783  elog(ERROR, "unexpected type of toast chunk");
2784 
2785  ent->size += chunksize;
2786  ent->last_chunk_seq = chunk_seq;
2787  ent->num_chunks++;
2788  dlist_push_tail(&ent->chunks, &change->node);
2789 }
bool IsToastRelation(Relation relation)
Definition: catalog.c:135
#define fastgetattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:719
#define DatumGetInt32(X)
Definition: postgres.h:480
#define RelationGetDescr(relation)
Definition: rel.h:425
#define VARHDRSZ_SHORT
Definition: postgres.h:269
#define VARSIZE(PTR)
Definition: postgres.h:306
#define VARHDRSZ
Definition: c.h:441
#define DatumGetObjectId(X)
Definition: postgres.h:508
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
union ReorderBufferChange::@49 data
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:885
unsigned int Oid
Definition: postgres_ext.h:31
signed int int32
Definition: c.h:253
char * Pointer
Definition: c.h:242
struct ReorderBufferChange::@49::@50 tp
#define ERROR
Definition: elog.h:43
#define VARATT_IS_SHORT(PTR)
Definition: postgres.h:327
struct varlena * reconstructed
HeapTupleData tuple
Definition: reorderbuffer.h:27
#define VARSIZE_SHORT(PTR)
Definition: postgres.h:308
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
#define VARATT_IS_EXTENDED(PTR)
Definition: postgres.h:328
#define DatumGetPointer(X)
Definition: postgres.h:557
#define elog
Definition: elog.h:219
static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferToastInitHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2705 of file reorderbuffer.c.

References Assert, ReorderBuffer::context, HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, NULL, and ReorderBufferTXN::toast_hash.

Referenced by ReorderBufferToastAppendChunk().

2706 {
2707  HASHCTL hash_ctl;
2708 
2709  Assert(txn->toast_hash == NULL);
2710 
2711  memset(&hash_ctl, 0, sizeof(hash_ctl));
2712  hash_ctl.keysize = sizeof(Oid);
2713  hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
2714  hash_ctl.hcxt = rb->context;
2715  txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
2717 }
struct ReorderBufferToastEnt ReorderBufferToastEnt
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
Size entrysize
Definition: hsearch.h:73
unsigned int Oid
Definition: postgres_ext.h:31
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext context
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:301
Size keysize
Definition: hsearch.h:72
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
static void ReorderBufferToastReplace ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 2799 of file reorderbuffer.c.

References Assert, tupleDesc::attrs, ReorderBufferToastEnt::chunks, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, DatumGetPointer, dlist_container, dlist_foreach, fastgetattr, free, HASH_FIND, hash_search(), heap_deform_tuple(), heap_form_tuple(), INDIRECT_POINTER_SIZE, MaxHeapTupleSize, MemoryContextSwitchTo(), tupleDesc::natts, NULL, palloc0(), pfree(), varatt_indirect::pointer, PointerGetDatum, RelationData::rd_rel, ReorderBufferToastEnt::reconstructed, RelationClose(), RelationGetDescr, RelationIdGetRelation(), ReorderBufferTupleBufData, SET_VARSIZE, SET_VARSIZE_COMPRESSED, SET_VARTAG_EXTERNAL, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, ReorderBufferTupleBuf::tuple, varatt_external::va_extsize, varatt_external::va_rawsize, varatt_external::va_valueid, VARATT_EXTERNAL_GET_POINTER, VARATT_EXTERNAL_IS_COMPRESSED, VARATT_IS_EXTERNAL, VARATT_IS_SHORT, VARDATA, VARDATA_EXTERNAL, VARHDRSZ, VARSIZE, and VARTAG_INDIRECT.

Referenced by ReorderBufferCommit().

2801 {
2802  TupleDesc desc;
2803  int natt;
2804  Datum *attrs;
2805  bool *isnull;
2806  bool *free;
2807  HeapTuple tmphtup;
2808  Relation toast_rel;
2809  TupleDesc toast_desc;
2810  MemoryContext oldcontext;
2811  ReorderBufferTupleBuf *newtup;
2812 
2813  /* no toast tuples changed */
2814  if (txn->toast_hash == NULL)
2815  return;
2816 
2817  oldcontext = MemoryContextSwitchTo(rb->context);
2818 
2819  /* we should only have toast tuples in an INSERT or UPDATE */
2820  Assert(change->data.tp.newtuple);
2821 
2822  desc = RelationGetDescr(relation);
2823 
2824  toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
2825  toast_desc = RelationGetDescr(toast_rel);
2826 
2827  /* should we allocate from stack instead? */
2828  attrs = palloc0(sizeof(Datum) * desc->natts);
2829  isnull = palloc0(sizeof(bool) * desc->natts);
2830  free = palloc0(sizeof(bool) * desc->natts);
2831 
2832  newtup = change->data.tp.newtuple;
2833 
2834  heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
2835 
2836  for (natt = 0; natt < desc->natts; natt++)
2837  {
2838  Form_pg_attribute attr = desc->attrs[natt];
2839  ReorderBufferToastEnt *ent;
2840  struct varlena *varlena;
2841 
2842  /* va_rawsize is the size of the original datum -- including header */
2843  struct varatt_external toast_pointer;
2844  struct varatt_indirect redirect_pointer;
2845  struct varlena *new_datum = NULL;
2846  struct varlena *reconstructed;
2847  dlist_iter it;
2848  Size data_done = 0;
2849 
2850  /* system columns aren't toasted */
2851  if (attr->attnum < 0)
2852  continue;
2853 
2854  if (attr->attisdropped)
2855  continue;
2856 
2857  /* not a varlena datatype */
2858  if (attr->attlen != -1)
2859  continue;
2860 
2861  /* no data */
2862  if (isnull[natt])
2863  continue;
2864 
2865  /* ok, we know we have a toast datum */
2866  varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
2867 
2868  /* no need to do anything if the tuple isn't external */
2869  if (!VARATT_IS_EXTERNAL(varlena))
2870  continue;
2871 
2872  VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
2873 
2874  /*
2875  * Check whether the toast tuple changed, replace if so.
2876  */
2877  ent = (ReorderBufferToastEnt *)
2878  hash_search(txn->toast_hash,
2879  (void *) &toast_pointer.va_valueid,
2880  HASH_FIND,
2881  NULL);
2882  if (ent == NULL)
2883  continue;
2884 
2885  new_datum =
2886  (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
2887 
2888  free[natt] = true;
2889 
2890  reconstructed = palloc0(toast_pointer.va_rawsize);
2891 
2892  ent->reconstructed = reconstructed;
2893 
2894  /* stitch toast tuple back together from its parts */
2895  dlist_foreach(it, &ent->chunks)
2896  {
2897  bool isnull;
2898  ReorderBufferChange *cchange;
2899  ReorderBufferTupleBuf *ctup;
2900  Pointer chunk;
2901 
2902  cchange = dlist_container(ReorderBufferChange, node, it.cur);
2903  ctup = cchange->data.tp.newtuple;
2904  chunk = DatumGetPointer(
2905  fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
2906 
2907  Assert(!isnull);
2908  Assert(!VARATT_IS_EXTERNAL(chunk));
2909  Assert(!VARATT_IS_SHORT(chunk));
2910 
2911  memcpy(VARDATA(reconstructed) + data_done,
2912  VARDATA(chunk),
2913  VARSIZE(chunk) - VARHDRSZ);
2914  data_done += VARSIZE(chunk) - VARHDRSZ;
2915  }
2916  Assert(data_done == toast_pointer.va_extsize);
2917 
2918  /* make sure its marked as compressed or not */
2919  if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
2920  SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
2921  else
2922  SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
2923 
2924  memset(&redirect_pointer, 0, sizeof(redirect_pointer));
2925  redirect_pointer.pointer = reconstructed;
2926 
2928  memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
2929  sizeof(redirect_pointer));
2930 
2931  attrs[natt] = PointerGetDatum(new_datum);
2932  }
2933 
2934  /*
2935  * Build tuple in separate memory & copy tuple back into the tuplebuf
2936  * passed to the output plugin. We can't directly heap_fill_tuple() into
2937  * the tuplebuf because attrs[] will point back into the current content.
2938  */
2939  tmphtup = heap_form_tuple(desc, attrs, isnull);
2940  Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
2941  Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
2942 
2943  memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
2944  newtup->tuple.t_len = tmphtup->t_len;
2945 
2946  /*
2947  * free resources we won't further need, more persistent stuff will be
2948  * free'd in ReorderBufferToastReset().
2949  */
2950  RelationClose(toast_rel);
2951  pfree(tmphtup);
2952  for (natt = 0; natt < desc->natts; natt++)
2953  {
2954  if (free[natt])
2955  pfree(DatumGetPointer(attrs[natt]));
2956  }
2957  pfree(attrs);
2958  pfree(free);
2959  pfree(isnull);
2960 
2961  MemoryContextSwitchTo(oldcontext);
2962 }
#define VARDATA(PTR)
Definition: postgres.h:305
#define fastgetattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:719
#define RelationGetDescr(relation)
Definition: rel.h:425
#define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)
Definition: tuptoaster.h:111
#define VARSIZE(PTR)
Definition: postgres.h:306
#define PointerGetDatum(X)
Definition: postgres.h:564
#define VARHDRSZ
Definition: c.h:441
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
Form_pg_attribute * attrs
Definition: tupdesc.h:74
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
union ReorderBufferChange::@49 data
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:692
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:885
Form_pg_class rd_rel
Definition: rel.h:113
#define VARDATA_EXTERNAL(PTR)
Definition: postgres.h:313
int natts
Definition: tupdesc.h:73
HeapTupleHeader t_data
Definition: htup.h:67
#define VARATT_IS_EXTERNAL(PTR)
Definition: postgres.h:316
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:36
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:992
char * Pointer
Definition: c.h:242
struct ReorderBufferChange::@49::@50 tp
#define VARATT_IS_SHORT(PTR)
Definition: postgres.h:327
uint32 t_len
Definition: htup.h:64
#define MaxHeapTupleSize
Definition: htup_details.h:561
struct varlena * reconstructed
#define SET_VARTAG_EXTERNAL(PTR, tag)
Definition: postgres.h:334
HeapTupleData tuple
Definition: reorderbuffer.h:27
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:184
void RelationClose(Relation relation)
Definition: relcache.c:2155
#define INDIRECT_POINTER_SIZE
Definition: tuptoaster.h:102
MemoryContext context
void * palloc0(Size size)
Definition: mcxt.c:920
uintptr_t Datum
Definition: postgres.h:374
dlist_node * cur
Definition: ilist.h:161
#define free(a)
Definition: header.h:60
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
Definition: tuptoaster.h:121
size_t Size
Definition: c.h:353
#define DatumGetPointer(X)
Definition: postgres.h:557
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:935
#define SET_VARSIZE_COMPRESSED(PTR, len)
Definition: postgres.h:332
Definition: c.h:435
#define SET_VARSIZE(PTR, len)
Definition: postgres.h:330
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2066
static void ReorderBufferToastReset ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2968 of file reorderbuffer.c.

References ReorderBufferToastEnt::chunks, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, hash_destroy(), hash_seq_init(), hash_seq_search(), ReorderBufferChange::node, NULL, pfree(), ReorderBufferToastEnt::reconstructed, ReorderBufferReturnChange(), and ReorderBufferTXN::toast_hash.

Referenced by ReorderBufferCommit().

2969 {
2970  HASH_SEQ_STATUS hstat;
2971  ReorderBufferToastEnt *ent;
2972 
2973  if (txn->toast_hash == NULL)
2974  return;
2975 
2976  /* sequentially walk over the hash and free everything */
2977  hash_seq_init(&hstat, txn->toast_hash);
2978  while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
2979  {
2980  dlist_mutable_iter it;
2981 
2982  if (ent->reconstructed != NULL)
2983  pfree(ent->reconstructed);
2984 
2985  dlist_foreach_modify(it, &ent->chunks)
2986  {
2987  ReorderBufferChange *change =
2989 
2990  dlist_delete(&change->node);
2991  ReorderBufferReturnChange(rb, change);
2992  }
2993  }
2994 
2995  hash_destroy(txn->toast_hash);
2996  txn->toast_hash = NULL;
2997 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:793
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:992
struct varlena * reconstructed
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define NULL
Definition: c.h:226
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1353
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1343
static ReorderBufferTXN * ReorderBufferTXNByXid ( ReorderBuffer rb,
TransactionId  xid,
bool  create,
bool is_new,
XLogRecPtr  lsn,
bool  create_as_top 
)
static

Definition at line 537 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::current_restart_decoding_lsn, dlist_push_tail(), ReorderBufferTXN::first_lsn, HASH_ENTER, HASH_FIND, hash_search(), InvalidXLogRecPtr, ReorderBufferTXN::node, NULL, ReorderBufferGetTXN(), ReorderBufferTXN::restart_decoding_lsn, ReorderBuffer::toplevel_by_lsn, TransactionIdIsValid, ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAddInvalidations(), ReorderBufferAddNewTupleCids(), ReorderBufferAssignChild(), ReorderBufferCommit(), ReorderBufferCommitChild(), ReorderBufferForget(), ReorderBufferProcessXid(), ReorderBufferQueueChange(), ReorderBufferQueueMessage(), ReorderBufferSetBaseSnapshot(), ReorderBufferXidHasBaseSnapshot(), ReorderBufferXidHasCatalogChanges(), and ReorderBufferXidSetCatalogChanges().

539 {
540  ReorderBufferTXN *txn;
542  bool found;
543 
545  Assert(!create || lsn != InvalidXLogRecPtr);
546 
547  /*
548  * Check the one-entry lookup cache first
549  */
551  rb->by_txn_last_xid == xid)
552  {
553  txn = rb->by_txn_last_txn;
554 
555  if (txn != NULL)
556  {
557  /* found it, and it's valid */
558  if (is_new)
559  *is_new = false;
560  return txn;
561  }
562 
563  /*
564  * cached as non-existent, and asked not to create? Then nothing else
565  * to do.
566  */
567  if (!create)
568  return NULL;
569  /* otherwise fall through to create it */
570  }
571 
572  /*
573  * If the cache wasn't hit or it yielded an "does-not-exist" and we want
574  * to create an entry.
575  */
576 
577  /* search the lookup table */
578  ent = (ReorderBufferTXNByIdEnt *)
579  hash_search(rb->by_txn,
580  (void *) &xid,
581  create ? HASH_ENTER : HASH_FIND,
582  &found);
583  if (found)
584  txn = ent->txn;
585  else if (create)
586  {
587  /* initialize the new entry, if creation was requested */
588  Assert(ent != NULL);
589 
590  ent->txn = ReorderBufferGetTXN(rb);
591  ent->txn->xid = xid;
592  txn = ent->txn;
593  txn->first_lsn = lsn;
595 
596  if (create_as_top)
597  {
598  dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
599  AssertTXNLsnOrder(rb);
600  }
601  }
602  else
603  txn = NULL; /* not found and not asked to create */
604 
605  /* update cache */
606  rb->by_txn_last_xid = xid;
607  rb->by_txn_last_txn = txn;
608 
609  if (is_new)
610  *is_new = !found;
611 
612  Assert(!create || txn != NULL);
613  return txn;
614 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
static ReorderBufferTXN * ReorderBufferGetTXN(ReorderBuffer *rb)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:885
ReorderBufferTXN * by_txn_last_txn
dlist_head toplevel_by_lsn
TransactionId xid
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
ReorderBufferTXN * txn
Definition: reorderbuffer.c:81
XLogRecPtr restart_decoding_lsn
#define TransactionIdIsValid(xid)
Definition: transam.h:41
bool ReorderBufferXidHasBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 2031 of file reorderbuffer.c.

References ReorderBufferTXN::base_snapshot, InvalidXLogRecPtr, NULL, and ReorderBufferTXNByXid().

Referenced by SnapBuildCommitTxn(), SnapBuildDistributeNewCatalogSnapshot(), and SnapBuildProcessChange().

2032 {
2033  ReorderBufferTXN *txn;
2034 
2035  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2036  false);
2037 
2038  /* transaction isn't known yet, ergo no snapshot */
2039  if (txn == NULL)
2040  return false;
2041 
2042  /*
2043  * TODO: It would be a nice improvement if we would check the toplevel
2044  * transaction in subtransactions, but we'd need to keep track of a bit
2045  * more state.
2046  */
2047  return txn->base_snapshot != NULL;
2048 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
#define NULL
Definition: c.h:226
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
bool ReorderBufferXidHasCatalogChanges ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 2015 of file reorderbuffer.c.

References ReorderBufferTXN::has_catalog_changes, InvalidXLogRecPtr, NULL, and ReorderBufferTXNByXid().

Referenced by SnapBuildCommitTxn().

2016 {
2017  ReorderBufferTXN *txn;
2018 
2019  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2020  false);
2021  if (txn == NULL)
2022  return false;
2023 
2024  return txn->has_catalog_changes;
2025 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define NULL
Definition: c.h:226
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferXidSetCatalogChanges ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2000 of file reorderbuffer.c.

References ReorderBufferTXN::has_catalog_changes, NULL, and ReorderBufferTXNByXid().

Referenced by DecodeCommit(), DecodeHeapOp(), and SnapBuildProcessNewCid().

2002 {
2003  ReorderBufferTXN *txn;
2004 
2005  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2006 
2007  txn->has_catalog_changes = true;
2008 }
#define NULL
Definition: c.h:226
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
bool ResolveCminCmaxDuringDecoding ( HTAB tuplecid_data,
Snapshot  snapshot,
HeapTuple  htup,
Buffer  buffer,
CommandId cmin,
CommandId cmax 
)

Definition at line 3274 of file reorderbuffer.c.

References Assert, BufferGetTag(), BufferIsLocal, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, HASH_FIND, hash_search(), ItemPointerCopy, ItemPointerGetBlockNumber, MAIN_FORKNUM, NULL, ReorderBufferTupleCidKey::relnode, HeapTupleData::t_self, HeapTupleData::t_tableOid, ReorderBufferTupleCidKey::tid, and UpdateLogicalMappings().

Referenced by HeapTupleSatisfiesHistoricMVCC().

3278 {
3281  ForkNumber forkno;
3282  BlockNumber blockno;
3283  bool updated_mapping = false;
3284 
3285  /* be careful about padding */
3286  memset(&key, 0, sizeof(key));
3287 
3288  Assert(!BufferIsLocal(buffer));
3289 
3290  /*
3291  * get relfilenode from the buffer, no convenient way to access it other
3292  * than that.
3293  */
3294  BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
3295 
3296  /* tuples can only be in the main fork */
3297  Assert(forkno == MAIN_FORKNUM);
3298  Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
3299 
3300  ItemPointerCopy(&htup->t_self,
3301  &key.tid);
3302 
3303 restart:
3304  ent = (ReorderBufferTupleCidEnt *)
3305  hash_search(tuplecid_data,
3306  (void *) &key,
3307  HASH_FIND,
3308  NULL);
3309 
3310  /*
3311  * failed to find a mapping, check whether the table was rewritten and
3312  * apply mapping if so, but only do that once - there can be no new
3313  * mappings while we are in here since we have to hold a lock on the
3314  * relation.
3315  */
3316  if (ent == NULL && !updated_mapping)
3317  {
3318  UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
3319  /* now check but don't update for a mapping again */
3320  updated_mapping = true;
3321  goto restart;
3322  }
3323  else if (ent == NULL)
3324  return false;
3325 
3326  if (cmin)
3327  *cmin = ent->cmin;
3328  if (cmax)
3329  *cmax = ent->cmax;
3330  return true;
3331 }
uint32 BlockNumber
Definition: block.h:31
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:885
ItemPointerData t_self
Definition: htup.h:65
Oid t_tableOid
Definition: htup.h:66
ForkNumber
Definition: relpath.h:24
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
#define BufferIsLocal(buffer)
Definition: buf.h:37
static void UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
#define ItemPointerGetBlockNumber(pointer)
Definition: itemptr.h:66
void BufferGetTag(Buffer buffer, RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
Definition: bufmgr.c:2609
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:120
void StartupReorderBuffer ( void  )

Definition at line 2639 of file reorderbuffer.c.

References AllocateDir(), dirent::d_name, DEBUG2, ereport, errcode_for_file_access(), errmsg(), FreeDir(), lstat, MAXPGPATH, NULL, PANIC, ReadDir(), ReplicationSlotValidateName(), and unlink().

Referenced by StartupXLOG().

2640 {
2641  DIR *logical_dir;
2642  struct dirent *logical_de;
2643 
2644  DIR *spill_dir;
2645  struct dirent *spill_de;
2646 
2647  logical_dir = AllocateDir("pg_replslot");
2648  while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
2649  {
2650  struct stat statbuf;
2651  char path[MAXPGPATH];
2652 
2653  if (strcmp(logical_de->d_name, ".") == 0 ||
2654  strcmp(logical_de->d_name, "..") == 0)
2655  continue;
2656 
2657  /* if it cannot be a slot, skip the directory */
2658  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
2659  continue;
2660 
2661  /*
2662  * ok, has to be a surviving logical slot, iterate and delete
2663  * everything starting with xid-*
2664  */
2665  sprintf(path, "pg_replslot/%s", logical_de->d_name);
2666 
2667  /* we're only creating directories here, skip if it's not our's */
2668  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2669  continue;
2670 
2671  spill_dir = AllocateDir(path);
2672  while ((spill_de = ReadDir(spill_dir, path)) != NULL)
2673  {
2674  if (strcmp(spill_de->d_name, ".") == 0 ||
2675  strcmp(spill_de->d_name, "..") == 0)
2676  continue;
2677 
2678  /* only look at names that can be ours */
2679  if (strncmp(spill_de->d_name, "xid", 3) == 0)
2680  {
2681  sprintf(path, "pg_replslot/%s/%s", logical_de->d_name,
2682  spill_de->d_name);
2683 
2684  if (unlink(path) != 0)
2685  ereport(PANIC,
2687  errmsg("could not remove file \"%s\": %m",
2688  path)));
2689  }
2690  }
2691  FreeDir(spill_dir);
2692  }
2693  FreeDir(logical_dir);
2694 }
Definition: dirent.h:9
#define PANIC
Definition: elog.h:53
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:172
Definition: dirent.c:25
#define MAXPGPATH
#define DEBUG2
Definition: elog.h:24
int errcode_for_file_access(void)
Definition: elog.c:598
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2284
int unlink(const char *filename)
#define ereport(elevel, rest)
Definition: elog.h:122
#define NULL
Definition: c.h:226
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2350
int errmsg(const char *fmt,...)
Definition: elog.c:797
char d_name[MAX_PATH]
Definition: dirent.h:14
#define lstat(path, sb)
Definition: win32.h:272
int FreeDir(DIR *dir)
Definition: fd.c:2393
static bool TransactionIdInArray ( TransactionId  xid,
TransactionId xip,
Size  num 
)
static