PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
reorderbuffer.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/tuptoaster.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/combocid.h"
#include "utils/memdebug.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenodemap.h"
#include "utils/tqual.h"
Include dependency graph for reorderbuffer.c:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTXNByIdEnt
 
struct  ReorderBufferTupleCidKey
 
struct  ReorderBufferTupleCidEnt
 
struct  ReorderBufferIterTXNEntry
 
struct  ReorderBufferIterTXNState
 
struct  ReorderBufferToastEnt
 
struct  ReorderBufferDiskChange
 
struct  RewriteMappingFile
 

Typedefs

typedef struct
ReorderBufferTXNByIdEnt 
ReorderBufferTXNByIdEnt
 
typedef struct
ReorderBufferTupleCidKey 
ReorderBufferTupleCidKey
 
typedef struct
ReorderBufferTupleCidEnt 
ReorderBufferTupleCidEnt
 
typedef struct
ReorderBufferIterTXNEntry 
ReorderBufferIterTXNEntry
 
typedef struct
ReorderBufferIterTXNState 
ReorderBufferIterTXNState
 
typedef struct
ReorderBufferToastEnt 
ReorderBufferToastEnt
 
typedef struct
ReorderBufferDiskChange 
ReorderBufferDiskChange
 
typedef struct RewriteMappingFile RewriteMappingFile
 

Functions

static ReorderBufferTXNReorderBufferGetTXN (ReorderBuffer *rb)
 
static void ReorderBufferReturnTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferTXNReorderBufferTXNByXid (ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
 
static void AssertTXNLsnOrder (ReorderBuffer *rb)
 
static ReorderBufferIterTXNStateReorderBufferIterTXNInit (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferChangeReorderBufferIterTXNNext (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferIterTXNFinish (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferExecuteInvalidations (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferCheckSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeChange (ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
 
static Size ReorderBufferRestoreChanges (ReorderBuffer *rb, ReorderBufferTXN *txn, int *fd, XLogSegNo *segno)
 
static void ReorderBufferRestoreChange (ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
 
static void ReorderBufferRestoreCleanup (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferFreeSnap (ReorderBuffer *rb, Snapshot snap)
 
static Snapshot ReorderBufferCopySnap (ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
 
static void ReorderBufferToastInitHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReset (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReplace (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void ReorderBufferToastAppendChunk (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *rb)
 
void ReorderBufferReturnChange (ReorderBuffer *rb, ReorderBufferChange *change)
 
ReorderBufferTupleBufReorderBufferGetTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
static int ReorderBufferIterCompare (Datum a, Datum b, void *arg)
 
static void ReorderBufferCleanupTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferBuildTupleCidHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
static void ReorderBufferSerializeReserve (ReorderBuffer *rb, Size sz)
 
void StartupReorderBuffer (void)
 
static void ApplyLogicalMappingFile (HTAB *tuplecid_data, Oid relid, const char *fname)
 
static bool TransactionIdInArray (TransactionId xid, TransactionId *xip, Size num)
 
static int file_sort_by_lsn (const void *a_p, const void *b_p)
 
static void UpdateLogicalMappings (HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
 
bool ResolveCminCmaxDuringDecoding (HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
 

Variables

static const Size max_changes_in_memory = 4096
 
static const Size max_cached_tuplebufs = 4096 * 2
 

Typedef Documentation

Function Documentation

static void ApplyLogicalMappingFile ( HTAB tuplecid_data,
Oid  relid,
const char *  fname 
)
static

Definition at line 3032 of file reorderbuffer.c.

References Assert, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), HASH_ENTER, HASH_FIND, hash_search(), InvalidCommandId, ItemPointerCopy, MAXPGPATH, LogicalRewriteMappingData::new_node, LogicalRewriteMappingData::new_tid, NULL, LogicalRewriteMappingData::old_node, LogicalRewriteMappingData::old_tid, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, ReorderBufferTupleCidKey::relnode, ReorderBufferTupleCidKey::tid, and WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ.

Referenced by UpdateLogicalMappings().

3033 {
3034  char path[MAXPGPATH];
3035  int fd;
3036  int readBytes;
3038 
3039  sprintf(path, "pg_logical/mappings/%s", fname);
3040  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
3041  if (fd < 0)
3042  ereport(ERROR,
3044  errmsg("could not open file \"%s\": %m", path)));
3045 
3046  while (true)
3047  {
3050  ReorderBufferTupleCidEnt *new_ent;
3051  bool found;
3052 
3053  /* be careful about padding */
3054  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
3055 
3056  /* read all mappings till the end of the file */
3058  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
3060 
3061  if (readBytes < 0)
3062  ereport(ERROR,
3064  errmsg("could not read file \"%s\": %m",
3065  path)));
3066  else if (readBytes == 0) /* EOF */
3067  break;
3068  else if (readBytes != sizeof(LogicalRewriteMappingData))
3069  ereport(ERROR,
3071  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
3072  path, readBytes,
3073  (int32) sizeof(LogicalRewriteMappingData))));
3074 
3075  key.relnode = map.old_node;
3076  ItemPointerCopy(&map.old_tid,
3077  &key.tid);
3078 
3079 
3080  ent = (ReorderBufferTupleCidEnt *)
3081  hash_search(tuplecid_data,
3082  (void *) &key,
3083  HASH_FIND,
3084  NULL);
3085 
3086  /* no existing mapping, no need to update */
3087  if (!ent)
3088  continue;
3089 
3090  key.relnode = map.new_node;
3091  ItemPointerCopy(&map.new_tid,
3092  &key.tid);
3093 
3094  new_ent = (ReorderBufferTupleCidEnt *)
3095  hash_search(tuplecid_data,
3096  (void *) &key,
3097  HASH_ENTER,
3098  &found);
3099 
3100  if (found)
3101  {
3102  /*
3103  * Make sure the existing mapping makes sense. We sometime update
3104  * old records that did not yet have a cmax (e.g. pg_class' own
3105  * entry while rewriting it) during rewrites, so allow that.
3106  */
3107  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
3108  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
3109  }
3110  else
3111  {
3112  /* update mapping */
3113  new_ent->cmin = ent->cmin;
3114  new_ent->cmax = ent->cmax;
3115  new_ent->combocid = ent->combocid;
3116  }
3117  }
3118 }
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:885
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1038
signed int int32
Definition: c.h:256
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2107
int errcode_for_file_access(void)
Definition: elog.c:598
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1205
#define ereport(elevel, rest)
Definition: elog.h:122
#define InvalidCommandId
Definition: c.h:414
ItemPointerData new_tid
Definition: rewriteheap.h:40
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1181
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define read(a, b, c)
Definition: win32.h:18
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:120
ItemPointerData old_tid
Definition: rewriteheap.h:39
static void AssertTXNLsnOrder ( ReorderBuffer rb)
static

Definition at line 651 of file reorderbuffer.c.

References Assert, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, and ReorderBuffer::toplevel_by_lsn.

Referenced by ReorderBufferGetOldestTXN(), and ReorderBufferTXNByXid().

652 {
653 #ifdef USE_ASSERT_CHECKING
654  dlist_iter iter;
655  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
656 
657  dlist_foreach(iter, &rb->toplevel_by_lsn)
658  {
659  ReorderBufferTXN *cur_txn;
660 
661  cur_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
662  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
663 
664  if (cur_txn->end_lsn != InvalidXLogRecPtr)
665  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
666 
667  if (prev_first_lsn != InvalidXLogRecPtr)
668  Assert(prev_first_lsn < cur_txn->first_lsn);
669 
670  Assert(!cur_txn->is_known_as_subxact);
671  prev_first_lsn = cur_txn->first_lsn;
672  }
673 #endif
674 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head toplevel_by_lsn
dlist_node * cur
Definition: ilist.h:161
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
XLogRecPtr end_lsn
static int file_sort_by_lsn ( const void *  a_p,
const void *  b_p 
)
static

Definition at line 3135 of file reorderbuffer.c.

References RewriteMappingFile::lsn.

Referenced by UpdateLogicalMappings().

3136 {
3137  RewriteMappingFile *a = *(RewriteMappingFile **) a_p;
3138  RewriteMappingFile *b = *(RewriteMappingFile **) b_p;
3139 
3140  if (a->lsn < b->lsn)
3141  return -1;
3142  else if (a->lsn > b->lsn)
3143  return 1;
3144  return 0;
3145 }
void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1683 of file reorderbuffer.c.

References ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, NULL, ReorderBufferCleanupTXN(), and ReorderBufferTXNByXid().

Referenced by DecodeAbort().

1684 {
1685  ReorderBufferTXN *txn;
1686 
1687  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1688  false);
1689 
1690  /* unknown, nothing to remove */
1691  if (txn == NULL)
1692  return;
1693 
1694  /* cosmetic... */
1695  txn->final_lsn = lsn;
1696 
1697  /* remove potential on-disk data, and deallocate */
1698  ReorderBufferCleanupTXN(rb, txn);
1699 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define NULL
Definition: c.h:229
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)

Definition at line 1709 of file reorderbuffer.c.

References dlist_mutable_iter::cur, DEBUG1, dlist_container, dlist_foreach_modify, elog, ReorderBufferCleanupTXN(), ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), and ReorderBufferTXN::xid.

Referenced by DecodeStandbyOp().

1710 {
1711  dlist_mutable_iter it;
1712 
1713  /*
1714  * Iterate through all (potential) toplevel TXNs and abort all that are
1715  * older than what possibly can be running. Once we've found the first
1716  * that is alive we stop, there might be some that acquired an xid earlier
1717  * but started writing later, but it's unlikely and they will cleaned up
1718  * in a later call to ReorderBufferAbortOld().
1719  */
1721  {
1722  ReorderBufferTXN *txn;
1723 
1724  txn = dlist_container(ReorderBufferTXN, node, it.cur);
1725 
1726  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1727  {
1728  elog(DEBUG1, "aborting old transaction %u", txn->xid);
1729 
1730  /* remove potential on-disk data, and deallocate this tx */
1731  ReorderBufferCleanupTXN(rb, txn);
1732  }
1733  else
1734  return;
1735  }
1736 }
#define DEBUG1
Definition: elog.h:25
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
dlist_head toplevel_by_lsn
TransactionId xid
#define elog
Definition: elog.h:219
void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 1921 of file reorderbuffer.c.

References Assert, ReorderBuffer::context, elog, ERROR, ReorderBufferTXN::invalidations, MemoryContextAlloc(), ReorderBufferTXN::ninvalidations, NULL, and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

1924 {
1925  ReorderBufferTXN *txn;
1926 
1927  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1928 
1929  if (txn->ninvalidations != 0)
1930  elog(ERROR, "only ever add one set of invalidations");
1931 
1932  Assert(nmsgs > 0);
1933 
1934  txn->ninvalidations = nmsgs;
1937  sizeof(SharedInvalidationMessage) * nmsgs);
1938  memcpy(txn->invalidations, msgs,
1939  sizeof(SharedInvalidationMessage) * nmsgs);
1940 }
#define ERROR
Definition: elog.h:43
MemoryContext context
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:707
#define elog
Definition: elog.h:219
void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

Definition at line 1877 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::command_id, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, ReorderBufferGetChange(), and ReorderBufferQueueChange().

Referenced by SnapBuildProcessNewCid().

1879 {
1881 
1882  change->data.command_id = cid;
1884 
1885  ReorderBufferQueueChange(rb, xid, lsn, change);
1886 }
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
union ReorderBufferChange::@86 data
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileNode  node,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 1893 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, NULL, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, and ReorderBufferTXN::tuplecids.

Referenced by SnapBuildProcessNewCid().

1897 {
1899  ReorderBufferTXN *txn;
1900 
1901  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1902 
1903  change->data.tuplecid.node = node;
1904  change->data.tuplecid.tid = tid;
1905  change->data.tuplecid.cmin = cmin;
1906  change->data.tuplecid.cmax = cmax;
1907  change->data.tuplecid.combocid = combocid;
1908  change->lsn = lsn;
1910 
1911  dlist_push_tail(&txn->tuplecids, &change->node);
1912  txn->ntuplecids++;
1913 }
struct ReorderBufferChange::@86::@89 tuplecid
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
union ReorderBufferChange::@86 data
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
#define NULL
Definition: c.h:229
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
dlist_head tuplecids
void ReorderBufferAddSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 1837 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, ReorderBufferGetChange(), ReorderBufferQueueChange(), and ReorderBufferChange::snapshot.

Referenced by SnapBuildDistributeNewCatalogSnapshot().

1839 {
1841 
1842  change->data.snapshot = snap;
1844 
1845  ReorderBufferQueueChange(rb, xid, lsn, change);
1846 }
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
union ReorderBufferChange::@86 data
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 224 of file reorderbuffer.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate(), buffer, ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::cached_tuplebufs, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, dlist_init(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), ReorderBuffer::nr_cached_tuplebufs, NULL, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, SLAB_DEFAULT_BLOCK_SIZE, SlabContextCreate(), slist_init(), ReorderBuffer::toplevel_by_lsn, and ReorderBuffer::txn_context.

Referenced by StartupDecodingContext().

225 {
227  HASHCTL hash_ctl;
228  MemoryContext new_ctx;
229 
230  /* allocate memory in own context, to have better accountability */
232  "ReorderBuffer",
234 
235  buffer =
236  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
237 
238  memset(&hash_ctl, 0, sizeof(hash_ctl));
239 
240  buffer->context = new_ctx;
241 
242  buffer->change_context = SlabContextCreate(new_ctx,
243  "Change",
245  sizeof(ReorderBufferChange));
246 
247  buffer->txn_context = SlabContextCreate(new_ctx,
248  "TXN",
250  sizeof(ReorderBufferTXN));
251 
252  hash_ctl.keysize = sizeof(TransactionId);
253  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
254  hash_ctl.hcxt = buffer->context;
255 
256  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
258 
260  buffer->by_txn_last_txn = NULL;
261 
262  buffer->nr_cached_tuplebufs = 0;
263 
264  buffer->outbuf = NULL;
265  buffer->outbufsize = 0;
266 
268 
269  dlist_init(&buffer->toplevel_by_lsn);
270  slist_init(&buffer->cached_tuplebufs);
271 
272  return buffer;
273 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
uint32 TransactionId
Definition: c.h:397
MemoryContext hcxt
Definition: hsearch.h:78
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
Size entrysize
Definition: hsearch.h:73
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:183
MemoryContext change_context
static void slist_init(slist_head *head)
Definition: ilist.h:554
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
#define InvalidTransactionId
Definition: transam.h:31
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext context
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:301
ReorderBufferTXN * by_txn_last_txn
dlist_head toplevel_by_lsn
Size keysize
Definition: hsearch.h:72
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
#define NULL
Definition: c.h:229
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:207
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:194
Size nr_cached_tuplebufs
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:707
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
MemoryContext txn_context
slist_head cached_tuplebufs
void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 700 of file reorderbuffer.c.

References Assert, dlist_delete(), dlist_push_tail(), elog, ERROR, ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, ReorderBufferTXNByXid(), and ReorderBufferTXN::subtxns.

Referenced by DecodeXactOp().

702 {
703  ReorderBufferTXN *txn;
704  ReorderBufferTXN *subtxn;
705  bool new_top;
706  bool new_sub;
707 
708  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
709  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
710 
711  if (new_sub)
712  {
713  /*
714  * we assign subtransactions to top level transaction even if we don't
715  * have data for it yet, assignment records frequently reference xids
716  * that have not yet produced any records. Knowing those aren't top
717  * level xids allows us to make processing cheaper in some places.
718  */
719  dlist_push_tail(&txn->subtxns, &subtxn->node);
720  txn->nsubtxns++;
721  }
722  else if (!subtxn->is_known_as_subxact)
723  {
724  subtxn->is_known_as_subxact = true;
725  Assert(subtxn->nsubtxns == 0);
726 
727  /* remove from lsn order list of top-level transactions */
728  dlist_delete(&subtxn->node);
729 
730  /* add to toplevel transaction */
731  dlist_push_tail(&txn->subtxns, &subtxn->node);
732  txn->nsubtxns++;
733  }
734  else if (new_top)
735  {
736  elog(ERROR, "existing subxact assigned to unknown toplevel xact");
737  }
738 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
#define ERROR
Definition: elog.h:43
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define Assert(condition)
Definition: c.h:675
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define elog
Definition: elog.h:219
static void ReorderBufferBuildTupleCidHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1157 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, dlist_container, dlist_foreach, dlist_is_empty(), HASHCTL::entrysize, ReorderBufferTXN::has_catalog_changes, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FIND, hash_search(), HASHCTL::hcxt, InvalidCommandId, ItemPointerCopy, HASHCTL::keysize, ReorderBufferTXN::ntuplecids, ReorderBufferTupleCidKey::relnode, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferTupleCidKey::tid, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::tuplecids.

Referenced by ReorderBufferCommit().

1158 {
1159  dlist_iter iter;
1160  HASHCTL hash_ctl;
1161 
1162  if (!txn->has_catalog_changes || dlist_is_empty(&txn->tuplecids))
1163  return;
1164 
1165  memset(&hash_ctl, 0, sizeof(hash_ctl));
1166 
1167  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1168  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1169  hash_ctl.hcxt = rb->context;
1170 
1171  /*
1172  * create the hash with the exact number of to-be-stored tuplecids from
1173  * the start
1174  */
1175  txn->tuplecid_hash =
1176  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1178 
1179  dlist_foreach(iter, &txn->tuplecids)
1180  {
1183  bool found;
1184  ReorderBufferChange *change;
1185 
1186  change = dlist_container(ReorderBufferChange, node, iter.cur);
1187 
1189 
1190  /* be careful about padding */
1191  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1192 
1193  key.relnode = change->data.tuplecid.node;
1194 
1195  ItemPointerCopy(&change->data.tuplecid.tid,
1196  &key.tid);
1197 
1198  ent = (ReorderBufferTupleCidEnt *)
1200  (void *) &key,
1202  &found);
1203  if (!found)
1204  {
1205  ent->cmin = change->data.tuplecid.cmin;
1206  ent->cmax = change->data.tuplecid.cmax;
1207  ent->combocid = change->data.tuplecid.combocid;
1208  }
1209  else
1210  {
1211  Assert(ent->cmin == change->data.tuplecid.cmin);
1212  Assert(ent->cmax == InvalidCommandId ||
1213  ent->cmax == change->data.tuplecid.cmax);
1214 
1215  /*
1216  * if the tuple got valid in this transaction and now got deleted
1217  * we already have a valid cmin stored. The cmax will be
1218  * InvalidCommandId though.
1219  */
1220  ent->cmax = change->data.tuplecid.cmax;
1221  }
1222  }
1223 }
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
struct ReorderBufferChange::@86::@89 tuplecid
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
Size entrysize
Definition: hsearch.h:73
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:885
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
union ReorderBufferChange::@86 data
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext context
#define InvalidCommandId
Definition: c.h:414
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:301
Size keysize
Definition: hsearch.h:72
dlist_node * cur
Definition: ilist.h:161
#define Assert(condition)
Definition: c.h:675
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
dlist_head tuplecids
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:120
static void ReorderBufferCheckSerializeTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2038 of file reorderbuffer.c.

References Assert, max_changes_in_memory, ReorderBufferTXN::nentries_mem, and ReorderBufferSerializeTXN().

Referenced by ReorderBufferQueueChange().

2039 {
2040  /*
2041  * TODO: improve accounting so we cheaply can take subtransactions into
2042  * account here.
2043  */
2044  if (txn->nentries_mem >= max_changes_in_memory)
2045  {
2046  ReorderBufferSerializeTXN(rb, txn);
2047  Assert(txn->nentries_mem == 0);
2048  }
2049 }
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:675
static const Size max_changes_in_memory
static void ReorderBufferCleanupTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1074 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBuffer::by_txn, ReorderBufferTXN::changes, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, HASH_REMOVE, hash_search(), InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, NULL, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferRestoreCleanup(), ReorderBufferReturnChange(), ReorderBufferReturnTXN(), SnapBuildSnapDecRefcount(), ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAbortOld(), ReorderBufferCommit(), and ReorderBufferForget().

1075 {
1076  bool found;
1077  dlist_mutable_iter iter;
1078 
1079  /* cleanup subtransactions & their changes */
1080  dlist_foreach_modify(iter, &txn->subtxns)
1081  {
1082  ReorderBufferTXN *subtxn;
1083 
1084  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1085 
1086  /*
1087  * Subtransactions are always associated to the toplevel TXN, even if
1088  * they originally were happening inside another subtxn, so we won't
1089  * ever recurse more than one level deep here.
1090  */
1091  Assert(subtxn->is_known_as_subxact);
1092  Assert(subtxn->nsubtxns == 0);
1093 
1094  ReorderBufferCleanupTXN(rb, subtxn);
1095  }
1096 
1097  /* cleanup changes in the toplevel txn */
1098  dlist_foreach_modify(iter, &txn->changes)
1099  {
1100  ReorderBufferChange *change;
1101 
1102  change = dlist_container(ReorderBufferChange, node, iter.cur);
1103 
1104  ReorderBufferReturnChange(rb, change);
1105  }
1106 
1107  /*
1108  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1109  * They are always stored in the toplevel transaction.
1110  */
1111  dlist_foreach_modify(iter, &txn->tuplecids)
1112  {
1113  ReorderBufferChange *change;
1114 
1115  change = dlist_container(ReorderBufferChange, node, iter.cur);
1117  ReorderBufferReturnChange(rb, change);
1118  }
1119 
1120  if (txn->base_snapshot != NULL)
1121  {
1123  txn->base_snapshot = NULL;
1125  }
1126 
1127  /*
1128  * Remove TXN from its containing list.
1129  *
1130  * Note: if txn->is_known_as_subxact, we are deleting the TXN from its
1131  * parent's list of known subxacts; this leaves the parent's nsubxacts
1132  * count too high, but we don't care. Otherwise, we are deleting the TXN
1133  * from the LSN-ordered list of toplevel TXNs.
1134  */
1135  dlist_delete(&txn->node);
1136 
1137  /* now remove reference from buffer */
1138  hash_search(rb->by_txn,
1139  (void *) &txn->xid,
1140  HASH_REMOVE,
1141  &found);
1142  Assert(found);
1143 
1144  /* remove entries spilled to disk */
1145  if (txn->nentries != txn->nentries_mem)
1146  ReorderBufferRestoreCleanup(rb, txn);
1147 
1148  /* deallocate */
1149  ReorderBufferReturnTXN(rb, txn);
1150 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:885
XLogRecPtr base_snapshot_lsn
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:398
dlist_head subtxns
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
dlist_head tuplecids
void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 1312 of file reorderbuffer.c.

References AbortCurrentTransaction(), ReorderBufferChange::action, ReorderBuffer::apply_change, Assert, ReorderBufferTXN::base_snapshot, ReorderBuffer::begin, BeginInternalSubTransaction(), ReorderBufferChange::command_id, ReorderBuffer::commit, ReorderBufferTXN::commit_time, SnapshotData::copied, SnapshotData::curcid, ReorderBufferChange::data, dlist_delete(), elog, ReorderBufferTXN::end_lsn, ERROR, ReorderBufferTXN::final_lsn, FirstCommandId, GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, ReorderBuffer::message, ReorderBufferChange::msg, ReorderBufferTXN::ninvalidations, ReorderBufferChange::node, NULL, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelidByRelfilenode(), RELKIND_SEQUENCE, relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferReturnChange(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTXNByXid(), RollbackAndReleaseCurrentSubTransaction(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, StartTransactionCommand(), TeardownHistoricSnapshot(), ReorderBufferChange::tp, and ReorderBufferTXN::tuplecid_hash.

Referenced by DecodeCommit().

1316 {
1317  ReorderBufferTXN *txn;
1318  volatile Snapshot snapshot_now;
1319  volatile CommandId command_id = FirstCommandId;
1320  bool using_subtxn;
1321  ReorderBufferIterTXNState *volatile iterstate = NULL;
1322 
1323  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1324  false);
1325 
1326  /* unknown transaction, nothing to replay */
1327  if (txn == NULL)
1328  return;
1329 
1330  txn->final_lsn = commit_lsn;
1331  txn->end_lsn = end_lsn;
1332  txn->commit_time = commit_time;
1333  txn->origin_id = origin_id;
1334  txn->origin_lsn = origin_lsn;
1335 
1336  /*
1337  * If this transaction didn't have any real changes in our database, it's
1338  * OK not to have a snapshot. Note that ReorderBufferCommitChild will have
1339  * transferred its snapshot to this transaction if it had one and the
1340  * toplevel tx didn't.
1341  */
1342  if (txn->base_snapshot == NULL)
1343  {
1344  Assert(txn->ninvalidations == 0);
1345  ReorderBufferCleanupTXN(rb, txn);
1346  return;
1347  }
1348 
1349  snapshot_now = txn->base_snapshot;
1350 
1351  /* build data to be able to lookup the CommandIds of catalog tuples */
1353 
1354  /* setup the initial snapshot */
1355  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1356 
1357  /*
1358  * Decoding needs access to syscaches et al., which in turn use
1359  * heavyweight locks and such. Thus we need to have enough state around to
1360  * keep track of those. The easiest way is to simply use a transaction
1361  * internally. That also allows us to easily enforce that nothing writes
1362  * to the database by checking for xid assignments.
1363  *
1364  * When we're called via the SQL SRF there's already a transaction
1365  * started, so start an explicit subtransaction there.
1366  */
1367  using_subtxn = IsTransactionOrTransactionBlock();
1368 
1369  PG_TRY();
1370  {
1371  ReorderBufferChange *change;
1372  ReorderBufferChange *specinsert = NULL;
1373 
1374  if (using_subtxn)
1375  BeginInternalSubTransaction("replay");
1376  else
1378 
1379  rb->begin(rb, txn);
1380 
1381  iterstate = ReorderBufferIterTXNInit(rb, txn);
1382  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1383  {
1384  Relation relation = NULL;
1385  Oid reloid;
1386 
1387  switch (change->action)
1388  {
1390 
1391  /*
1392  * Confirmation for speculative insertion arrived. Simply
1393  * use as a normal record. It'll be cleaned up at the end
1394  * of INSERT processing.
1395  */
1396  Assert(specinsert->data.tp.oldtuple == NULL);
1397  change = specinsert;
1399 
1400  /* intentionally fall through */
1404  Assert(snapshot_now);
1405 
1406  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1407  change->data.tp.relnode.relNode);
1408 
1409  /*
1410  * Catalog tuple without data, emitted while catalog was
1411  * in the process of being rewritten.
1412  */
1413  if (reloid == InvalidOid &&
1414  change->data.tp.newtuple == NULL &&
1415  change->data.tp.oldtuple == NULL)
1416  goto change_done;
1417  else if (reloid == InvalidOid)
1418  elog(ERROR, "could not map filenode \"%s\" to relation OID",
1419  relpathperm(change->data.tp.relnode,
1420  MAIN_FORKNUM));
1421 
1422  relation = RelationIdGetRelation(reloid);
1423 
1424  if (relation == NULL)
1425  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1426  reloid,
1427  relpathperm(change->data.tp.relnode,
1428  MAIN_FORKNUM));
1429 
1430  if (!RelationIsLogicallyLogged(relation))
1431  goto change_done;
1432 
1433  /*
1434  * For now ignore sequence changes entirely. Most of the
1435  * time they don't log changes using records we
1436  * understand, so it doesn't make sense to handle the few
1437  * cases we do.
1438  */
1439  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1440  goto change_done;
1441 
1442  /* user-triggered change */
1443  if (!IsToastRelation(relation))
1444  {
1445  ReorderBufferToastReplace(rb, txn, relation, change);
1446  rb->apply_change(rb, txn, relation, change);
1447 
1448  /*
1449  * Only clear reassembled toast chunks if we're sure
1450  * they're not required anymore. The creator of the
1451  * tuple tells us.
1452  */
1453  if (change->data.tp.clear_toast_afterwards)
1454  ReorderBufferToastReset(rb, txn);
1455  }
1456  /* we're not interested in toast deletions */
1457  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1458  {
1459  /*
1460  * Need to reassemble the full toasted Datum in
1461  * memory, to ensure the chunks don't get reused till
1462  * we're done remove it from the list of this
1463  * transaction's changes. Otherwise it will get
1464  * freed/reused while restoring spooled data from
1465  * disk.
1466  */
1467  dlist_delete(&change->node);
1468  ReorderBufferToastAppendChunk(rb, txn, relation,
1469  change);
1470  }
1471 
1472  change_done:
1473 
1474  /*
1475  * Either speculative insertion was confirmed, or it was
1476  * unsuccessful and the record isn't needed anymore.
1477  */
1478  if (specinsert != NULL)
1479  {
1480  ReorderBufferReturnChange(rb, specinsert);
1481  specinsert = NULL;
1482  }
1483 
1484  if (relation != NULL)
1485  {
1486  RelationClose(relation);
1487  relation = NULL;
1488  }
1489  break;
1490 
1492 
1493  /*
1494  * Speculative insertions are dealt with by delaying the
1495  * processing of the insert until the confirmation record
1496  * arrives. For that we simply unlink the record from the
1497  * chain, so it does not get freed/reused while restoring
1498  * spooled data from disk.
1499  *
1500  * This is safe in the face of concurrent catalog changes
1501  * because the relevant relation can't be changed between
1502  * speculative insertion and confirmation due to
1503  * CheckTableNotInUse() and locking.
1504  */
1505 
1506  /* clear out a pending (and thus failed) speculation */
1507  if (specinsert != NULL)
1508  {
1509  ReorderBufferReturnChange(rb, specinsert);
1510  specinsert = NULL;
1511  }
1512 
1513  /* and memorize the pending insertion */
1514  dlist_delete(&change->node);
1515  specinsert = change;
1516  break;
1517 
1519  rb->message(rb, txn, change->lsn, true,
1520  change->data.msg.prefix,
1521  change->data.msg.message_size,
1522  change->data.msg.message);
1523  break;
1524 
1526  /* get rid of the old */
1527  TeardownHistoricSnapshot(false);
1528 
1529  if (snapshot_now->copied)
1530  {
1531  ReorderBufferFreeSnap(rb, snapshot_now);
1532  snapshot_now =
1533  ReorderBufferCopySnap(rb, change->data.snapshot,
1534  txn, command_id);
1535  }
1536 
1537  /*
1538  * Restored from disk, need to be careful not to double
1539  * free. We could introduce refcounting for that, but for
1540  * now this seems infrequent enough not to care.
1541  */
1542  else if (change->data.snapshot->copied)
1543  {
1544  snapshot_now =
1545  ReorderBufferCopySnap(rb, change->data.snapshot,
1546  txn, command_id);
1547  }
1548  else
1549  {
1550  snapshot_now = change->data.snapshot;
1551  }
1552 
1553 
1554  /* and continue with the new one */
1555  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1556  break;
1557 
1559  Assert(change->data.command_id != InvalidCommandId);
1560 
1561  if (command_id < change->data.command_id)
1562  {
1563  command_id = change->data.command_id;
1564 
1565  if (!snapshot_now->copied)
1566  {
1567  /* we don't use the global one anymore */
1568  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1569  txn, command_id);
1570  }
1571 
1572  snapshot_now->curcid = command_id;
1573 
1574  TeardownHistoricSnapshot(false);
1575  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1576 
1577  /*
1578  * Every time the CommandId is incremented, we could
1579  * see new catalog contents, so execute all
1580  * invalidations.
1581  */
1583  }
1584 
1585  break;
1586 
1588  elog(ERROR, "tuplecid value in changequeue");
1589  break;
1590  }
1591  }
1592 
1593  /*
1594  * There's a speculative insertion remaining, just clean in up, it
1595  * can't have been successful, otherwise we'd gotten a confirmation
1596  * record.
1597  */
1598  if (specinsert)
1599  {
1600  ReorderBufferReturnChange(rb, specinsert);
1601  specinsert = NULL;
1602  }
1603 
1604  /* clean up the iterator */
1605  ReorderBufferIterTXNFinish(rb, iterstate);
1606  iterstate = NULL;
1607 
1608  /* call commit callback */
1609  rb->commit(rb, txn, commit_lsn);
1610 
1611  /* this is just a sanity check against bad output plugin behaviour */
1613  elog(ERROR, "output plugin used XID %u",
1615 
1616  /* cleanup */
1617  TeardownHistoricSnapshot(false);
1618 
1619  /*
1620  * Aborting the current (sub-)transaction as a whole has the right
1621  * semantics. We want all locks acquired in here to be released, not
1622  * reassigned to the parent and we do not want any database access
1623  * have persistent effects.
1624  */
1626 
1627  /* make sure there's no cache pollution */
1629 
1630  if (using_subtxn)
1632 
1633  if (snapshot_now->copied)
1634  ReorderBufferFreeSnap(rb, snapshot_now);
1635 
1636  /* remove potential on-disk data, and deallocate */
1637  ReorderBufferCleanupTXN(rb, txn);
1638  }
1639  PG_CATCH();
1640  {
1641  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1642  if (iterstate)
1643  ReorderBufferIterTXNFinish(rb, iterstate);
1644 
1646 
1647  /*
1648  * Force cache invalidation to happen outside of a valid transaction
1649  * to prevent catalog access as we just caught an error.
1650  */
1652 
1653  /* make sure there's no cache pollution */
1655 
1656  if (using_subtxn)
1658 
1659  if (snapshot_now->copied)
1660  ReorderBufferFreeSnap(rb, snapshot_now);
1661 
1662  /* remove potential on-disk data, and deallocate */
1663  ReorderBufferCleanupTXN(rb, txn);
1664 
1665  PG_RE_THROW();
1666  }
1667  PG_END_TRY();
1668 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
uint32 CommandId
Definition: c.h:411
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
TimestampTz commit_time
void AbortCurrentTransaction(void)
Definition: xact.c:2986
bool IsToastRelation(Relation relation)
Definition: catalog.c:135
#define relpathperm(rnode, forknum)
Definition: relpath.h:67
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
ReorderBufferApplyChangeCB apply_change
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
bool copied
Definition: snapshot.h:94
struct ReorderBufferChange::@86::@87 tp
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4322
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:1965
ReorderBufferCommitCB commit
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:576
Form_pg_class rd_rel
Definition: rel.h:114
unsigned int Oid
Definition: postgres_ext.h:31
union ReorderBufferChange::@86 data
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
XLogRecPtr origin_lsn
#define FirstCommandId
Definition: c.h:413
#define ERROR
Definition: elog.h:43
struct ReorderBufferChange::@86::@88 msg
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:417
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4157
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:434
#define InvalidTransactionId
Definition: transam.h:31
XLogRecPtr final_lsn
void RelationClose(Relation relation)
Definition: relcache.c:2156
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
ReorderBufferMessageCB message
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
#define InvalidCommandId
Definition: c.h:414
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
#define InvalidOid
Definition: postgres_ext.h:36
CommandId curcid
Definition: snapshot.h:96
static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define PG_CATCH()
Definition: elog.h:293
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
XLogRecPtr end_lsn
void StartTransactionCommand(void)
Definition: xact.c:2677
void BeginInternalSubTransaction(char *name)
Definition: xact.c:4053
#define PG_RE_THROW()
Definition: elog.h:314
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:1949
#define elog
Definition: elog.h:219
ReorderBufferBeginCB begin
#define PG_TRY()
Definition: elog.h:284
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
#define RELKIND_SEQUENCE
Definition: pg_class.h:162
static ReorderBufferIterTXNState * ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2067
#define PG_END_TRY()
Definition: elog.h:300
void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 745 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_delete(), dlist_push_tail(), elog, ReorderBufferTXN::end_lsn, ERROR, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, NULL, ReorderBufferTXNByXid(), and ReorderBufferTXN::subtxns.

Referenced by DecodeCommit().

748 {
749  ReorderBufferTXN *txn;
750  ReorderBufferTXN *subtxn;
751 
752  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
753  InvalidXLogRecPtr, false);
754 
755  /*
756  * No need to do anything if that subtxn didn't contain any changes
757  */
758  if (!subtxn)
759  return;
760 
761  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true);
762 
763  if (txn == NULL)
764  elog(ERROR, "subxact logged without previous toplevel record");
765 
766  /*
767  * Pass our base snapshot to the parent transaction if it doesn't have
768  * one, or ours is older. That can happen if there are no changes in the
769  * toplevel transaction but in one of the child transactions. This allows
770  * the parent to simply use its base snapshot initially.
771  */
772  if (subtxn->base_snapshot != NULL &&
773  (txn->base_snapshot == NULL ||
774  txn->base_snapshot_lsn > subtxn->base_snapshot_lsn))
775  {
776  txn->base_snapshot = subtxn->base_snapshot;
777  txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
778  subtxn->base_snapshot = NULL;
780  }
781 
782  subtxn->final_lsn = commit_lsn;
783  subtxn->end_lsn = end_lsn;
784 
785  if (!subtxn->is_known_as_subxact)
786  {
787  subtxn->is_known_as_subxact = true;
788  Assert(subtxn->nsubtxns == 0);
789 
790  /* remove from lsn order list of top-level transactions */
791  dlist_delete(&subtxn->node);
792 
793  /* add to subtransaction list */
794  dlist_push_tail(&txn->subtxns, &subtxn->node);
795  txn->nsubtxns++;
796  }
797 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
XLogRecPtr base_snapshot_lsn
#define ERROR
Definition: elog.h:43
XLogRecPtr final_lsn
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
XLogRecPtr end_lsn
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define elog
Definition: elog.h:219
static Snapshot ReorderBufferCopySnap ( ReorderBuffer rb,
Snapshot  orig_snap,
ReorderBufferTXN txn,
CommandId  cid 
)
static

Definition at line 1231 of file reorderbuffer.c.

References SnapshotData::active_count, ReorderBuffer::context, SnapshotData::copied, dlist_iter::cur, SnapshotData::curcid, dlist_container, dlist_foreach, i, MemoryContextAllocZero(), ReorderBufferTXN::nsubtxns, qsort, SnapshotData::regd_count, ReorderBufferTXN::subtxns, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::xcnt, ReorderBufferTXN::xid, xidComparator(), and SnapshotData::xip.

Referenced by ReorderBufferCommit().

1233 {
1234  Snapshot snap;
1235  dlist_iter iter;
1236  int i = 0;
1237  Size size;
1238 
1239  size = sizeof(SnapshotData) +
1240  sizeof(TransactionId) * orig_snap->xcnt +
1241  sizeof(TransactionId) * (txn->nsubtxns + 1);
1242 
1243  snap = MemoryContextAllocZero(rb->context, size);
1244  memcpy(snap, orig_snap, sizeof(SnapshotData));
1245 
1246  snap->copied = true;
1247  snap->active_count = 1; /* mark as active so nobody frees it */
1248  snap->regd_count = 0;
1249  snap->xip = (TransactionId *) (snap + 1);
1250 
1251  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1252 
1253  /*
1254  * snap->subxip contains all txids that belong to our transaction which we
1255  * need to check via cmin/cmax. Thats why we store the toplevel
1256  * transaction in there as well.
1257  */
1258  snap->subxip = snap->xip + snap->xcnt;
1259  snap->subxip[i++] = txn->xid;
1260 
1261  /*
1262  * nsubxcnt isn't decreased when subtransactions abort, so count manually.
1263  * Since it's an upper boundary it is safe to use it for the allocation
1264  * above.
1265  */
1266  snap->subxcnt = 1;
1267 
1268  dlist_foreach(iter, &txn->subtxns)
1269  {
1270  ReorderBufferTXN *sub_txn;
1271 
1272  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1273  snap->subxip[i++] = sub_txn->xid;
1274  snap->subxcnt++;
1275  }
1276 
1277  /* sort so we can bsearch() later */
1278  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1279 
1280  /* store the specified current CommandId */
1281  snap->curcid = cid;
1282 
1283  return snap;
1284 }
uint32 TransactionId
Definition: c.h:397
bool copied
Definition: snapshot.h:94
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
uint32 regd_count
Definition: snapshot.h:108
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
struct SnapshotData SnapshotData
TransactionId * xip
Definition: snapshot.h:77
MemoryContext context
TransactionId xid
dlist_node * cur
Definition: ilist.h:161
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:742
CommandId curcid
Definition: snapshot.h:96
size_t Size
Definition: c.h:356
dlist_head subtxns
uint32 xcnt
Definition: snapshot.h:78
int i
#define qsort(a, b, c, d)
Definition: port.h:440
TransactionId * subxip
Definition: snapshot.h:89
uint32 active_count
Definition: snapshot.h:107
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:138
int32 subxcnt
Definition: snapshot.h:90
static void ReorderBufferExecuteInvalidations ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1947 of file reorderbuffer.c.

References i, ReorderBufferTXN::invalidations, LocalExecuteInvalidationMessage(), and ReorderBufferTXN::ninvalidations.

Referenced by ReorderBufferCommit().

1948 {
1949  int i;
1950 
1951  for (i = 0; i < txn->ninvalidations; i++)
1953 }
SharedInvalidationMessage * invalidations
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:547
int i
void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1752 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, NULL, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

1753 {
1754  ReorderBufferTXN *txn;
1755 
1756  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1757  false);
1758 
1759  /* unknown, nothing to forget */
1760  if (txn == NULL)
1761  return;
1762 
1763  /* cosmetic... */
1764  txn->final_lsn = lsn;
1765 
1766  /*
1767  * Process cache invalidation messages if there are any. Even if we're not
1768  * interested in the transaction's contents, it could have manipulated the
1769  * catalog and we need to update the caches according to that.
1770  */
1771  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
1773  txn->invalidations);
1774  else
1775  Assert(txn->ninvalidations == 0);
1776 
1777  /* remove potential on-disk data, and deallocate */
1778  ReorderBufferCleanupTXN(rb, txn);
1779 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferFree ( ReorderBuffer rb)

Definition at line 279 of file reorderbuffer.c.

References ReorderBuffer::context, and MemoryContextDelete().

Referenced by FreeDecodingContext().

280 {
281  MemoryContext context = rb->context;
282 
283  /*
284  * We free separately allocated data by entirely scrapping reorderbuffer's
285  * memory context.
286  */
287  MemoryContextDelete(context);
288 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
MemoryContext context
static void ReorderBufferFreeSnap ( ReorderBuffer rb,
Snapshot  snap 
)
static

Definition at line 1290 of file reorderbuffer.c.

References SnapshotData::copied, pfree(), and SnapBuildSnapDecRefcount().

Referenced by ReorderBufferCommit(), and ReorderBufferReturnChange().

1291 {
1292  if (snap->copied)
1293  pfree(snap);
1294  else
1296 }
bool copied
Definition: snapshot.h:94
void pfree(void *pointer)
Definition: mcxt.c:950
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:398
ReorderBufferChange* ReorderBufferGetChange ( ReorderBuffer rb)

Definition at line 347 of file reorderbuffer.c.

References ReorderBuffer::change_context, and MemoryContextAlloc().

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddNewTupleCids(), ReorderBufferAddSnapshot(), ReorderBufferQueueMessage(), and ReorderBufferRestoreChange().

348 {
349  ReorderBufferChange *change;
350 
351  change = (ReorderBufferChange *)
353 
354  memset(change, 0, sizeof(ReorderBufferChange));
355  return change;
356 }
MemoryContext change_context
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:707
ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer rb)

Definition at line 677 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, NULL, and ReorderBuffer::toplevel_by_lsn.

Referenced by SnapBuildProcessRunningXacts().

678 {
679  ReorderBufferTXN *txn;
680 
682  return NULL;
683 
684  AssertTXNLsnOrder(rb);
685 
687 
690  return txn;
691 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
dlist_head toplevel_by_lsn
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
ReorderBufferTupleBuf* ReorderBufferGetTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)

Definition at line 416 of file reorderbuffer.c.

References ReorderBufferTupleBuf::alloc_tuple_size, Assert, ReorderBuffer::cached_tuplebufs, ReorderBuffer::context, MaxHeapTupleSize, MemoryContextAlloc(), ReorderBuffer::nr_cached_tuplebufs, ReorderBufferTupleBufData, SizeofHeapTupleHeader, slist_container, slist_pop_head_node(), HeapTupleData::t_data, ReorderBufferTupleBuf::tuple, and VALGRIND_MAKE_MEM_UNDEFINED.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

417 {
418  ReorderBufferTupleBuf *tuple;
419  Size alloc_len;
420 
421  alloc_len = tuple_len + SizeofHeapTupleHeader;
422 
423  /*
424  * Most tuples are below MaxHeapTupleSize, so we use a slab allocator for
425  * those. Thus always allocate at least MaxHeapTupleSize. Note that tuples
426  * generated for oldtuples can be bigger, as they don't have out-of-line
427  * toast columns.
428  */
429  if (alloc_len < MaxHeapTupleSize)
430  alloc_len = MaxHeapTupleSize;
431 
432 
433  /* if small enough, check the slab cache */
434  if (alloc_len <= MaxHeapTupleSize && rb->nr_cached_tuplebufs)
435  {
436  rb->nr_cached_tuplebufs--;
440 #ifdef USE_ASSERT_CHECKING
441  memset(&tuple->tuple, 0xa9, sizeof(HeapTupleData));
443 #endif
444  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
445 #ifdef USE_ASSERT_CHECKING
446  memset(tuple->tuple.t_data, 0xa8, tuple->alloc_tuple_size);
448 #endif
449  }
450  else
451  {
452  tuple = (ReorderBufferTupleBuf *)
454  sizeof(ReorderBufferTupleBuf) +
455  MAXIMUM_ALIGNOF + alloc_len);
456  tuple->alloc_tuple_size = alloc_len;
457  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
458  }
459 
460  return tuple;
461 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:170
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
Definition: memdebug.h:28
HeapTupleHeader t_data
Definition: htup.h:67
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:36
#define MaxHeapTupleSize
Definition: htup_details.h:561
HeapTupleData tuple
Definition: reorderbuffer.h:27
static slist_node * slist_pop_head_node(slist_head *head)
Definition: ilist.h:596
MemoryContext context
#define slist_container(type, membername, ptr)
Definition: ilist.h:674
#define Assert(condition)
Definition: c.h:675
size_t Size
Definition: c.h:356
Size nr_cached_tuplebufs
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:707
slist_head cached_tuplebufs
static ReorderBufferTXN * ReorderBufferGetTXN ( ReorderBuffer rb)
static

Definition at line 294 of file reorderbuffer.c.

References ReorderBufferTXN::changes, dlist_init(), MemoryContextAlloc(), ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, and ReorderBuffer::txn_context.

Referenced by ReorderBufferTXNByXid().

295 {
296  ReorderBufferTXN *txn;
297 
298  txn = (ReorderBufferTXN *)
300 
301  memset(txn, 0, sizeof(ReorderBufferTXN));
302 
303  dlist_init(&txn->changes);
304  dlist_init(&txn->tuplecids);
305  dlist_init(&txn->subtxns);
306 
307  return txn;
308 }
dlist_head changes
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
dlist_head subtxns
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:707
MemoryContext txn_context
dlist_head tuplecids
void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 1788 of file reorderbuffer.c.

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by DecodeStandbyOp(), and ReorderBufferForget().

1790 {
1791  bool use_subtxn = IsTransactionOrTransactionBlock();
1792  int i;
1793 
1794  if (use_subtxn)
1795  BeginInternalSubTransaction("replay");
1796 
1797  /*
1798  * Force invalidations to happen outside of a valid transaction - that way
1799  * entries will just be marked as invalid without accessing the catalog.
1800  * That's advantageous because we don't need to setup the full state
1801  * necessary for catalog access.
1802  */
1803  if (use_subtxn)
1805 
1806  for (i = 0; i < ninvalidations; i++)
1807  LocalExecuteInvalidationMessage(&invalidations[i]);
1808 
1809  if (use_subtxn)
1811 }
void AbortCurrentTransaction(void)
Definition: xact.c:2986
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4322
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4157
void BeginInternalSubTransaction(char *name)
Definition: xact.c:4053
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:547
int i
static int ReorderBufferIterCompare ( Datum  a,
Datum  b,
void *  arg 
)
static

Definition at line 816 of file reorderbuffer.c.

References DatumGetInt32, ReorderBufferIterTXNState::entries, and ReorderBufferIterTXNEntry::lsn.

Referenced by ReorderBufferIterTXNInit().

817 {
819  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
820  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
821 
822  if (pos_a < pos_b)
823  return 1;
824  else if (pos_a == pos_b)
825  return 0;
826  return -1;
827 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
#define DatumGetInt32(X)
Definition: postgres.h:478
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:298
void * arg
static void ReorderBufferIterTXNFinish ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1043 of file reorderbuffer.c.

References Assert, binaryheap_free(), CloseTransientFile(), dlist_container, dlist_is_empty(), dlist_pop_head_node(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::fd, ReorderBufferIterTXNState::heap, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, pfree(), and ReorderBufferReturnChange().

Referenced by ReorderBufferCommit().

1045 {
1046  int32 off;
1047 
1048  for (off = 0; off < state->nr_txns; off++)
1049  {
1050  if (state->entries[off].fd != -1)
1051  CloseTransientFile(state->entries[off].fd);
1052  }
1053 
1054  /* free memory we might have "leaked" in the last *Next call */
1055  if (!dlist_is_empty(&state->old_change))
1056  {
1057  ReorderBufferChange *change;
1058 
1059  change = dlist_container(ReorderBufferChange, node,
1060  dlist_pop_head_node(&state->old_change));
1061  ReorderBufferReturnChange(rb, change);
1062  Assert(dlist_is_empty(&state->old_change));
1063  }
1064 
1065  binaryheap_free(state->heap);
1066  pfree(state);
1067 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
signed int int32
Definition: c.h:256
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:950
int CloseTransientFile(int fd)
Definition: fd.c:2268
#define Assert(condition)
Definition: c.h:675
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:69
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
static ReorderBufferIterTXNState * ReorderBufferIterTXNInit ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 834 of file reorderbuffer.c.

References binaryheap_add_unordered(), binaryheap_allocate(), binaryheap_build(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, ReorderBuffer::context, dlist_iter::cur, dlist_container, dlist_foreach, dlist_head_element, dlist_init(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::fd, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, MemoryContextAllocZero(), ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, ReorderBufferIterCompare(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferIterTXNEntry::segno, ReorderBufferTXN::subtxns, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferCommit().

835 {
836  Size nr_txns = 0;
838  dlist_iter cur_txn_i;
839  int32 off;
840 
841  /*
842  * Calculate the size of our heap: one element for every transaction that
843  * contains changes. (Besides the transactions already in the reorder
844  * buffer, we count the one we were directly passed.)
845  */
846  if (txn->nentries > 0)
847  nr_txns++;
848 
849  dlist_foreach(cur_txn_i, &txn->subtxns)
850  {
851  ReorderBufferTXN *cur_txn;
852 
853  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
854 
855  if (cur_txn->nentries > 0)
856  nr_txns++;
857  }
858 
859  /*
860  * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no
861  * need to allocate/build a heap then.
862  */
863 
864  /* allocate iteration state */
865  state = (ReorderBufferIterTXNState *)
867  sizeof(ReorderBufferIterTXNState) +
868  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
869 
870  state->nr_txns = nr_txns;
871  dlist_init(&state->old_change);
872 
873  for (off = 0; off < state->nr_txns; off++)
874  {
875  state->entries[off].fd = -1;
876  state->entries[off].segno = 0;
877  }
878 
879  /* allocate heap */
880  state->heap = binaryheap_allocate(state->nr_txns,
882  state);
883 
884  /*
885  * Now insert items into the binary heap, in an unordered fashion. (We
886  * will run a heap assembly step at the end; this is more efficient.)
887  */
888 
889  off = 0;
890 
891  /* add toplevel transaction if it contains changes */
892  if (txn->nentries > 0)
893  {
894  ReorderBufferChange *cur_change;
895 
896  if (txn->nentries != txn->nentries_mem)
897  {
898  /* serialize remaining changes */
899  ReorderBufferSerializeTXN(rb, txn);
900  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
901  &state->entries[off].segno);
902  }
903 
904  cur_change = dlist_head_element(ReorderBufferChange, node,
905  &txn->changes);
906 
907  state->entries[off].lsn = cur_change->lsn;
908  state->entries[off].change = cur_change;
909  state->entries[off].txn = txn;
910 
912  }
913 
914  /* add subtransactions if they contain changes */
915  dlist_foreach(cur_txn_i, &txn->subtxns)
916  {
917  ReorderBufferTXN *cur_txn;
918 
919  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
920 
921  if (cur_txn->nentries > 0)
922  {
923  ReorderBufferChange *cur_change;
924 
925  if (cur_txn->nentries != cur_txn->nentries_mem)
926  {
927  /* serialize remaining changes */
928  ReorderBufferSerializeTXN(rb, cur_txn);
929  ReorderBufferRestoreChanges(rb, cur_txn,
930  &state->entries[off].fd,
931  &state->entries[off].segno);
932  }
933  cur_change = dlist_head_element(ReorderBufferChange, node,
934  &cur_txn->changes);
935 
936  state->entries[off].lsn = cur_change->lsn;
937  state->entries[off].change = cur_change;
938  state->entries[off].txn = cur_txn;
939 
941  }
942  }
943 
944  /* assemble a valid binary heap */
945  binaryheap_build(state->heap);
946 
947  return state;
948 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
ReorderBufferTXN * txn
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, int *fd, XLogSegNo *segno)
void binaryheap_add_unordered(binaryheap *heap, Datum d)
Definition: binaryheap.c:110
signed int int32
Definition: c.h:256
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
ReorderBufferChange * change
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
MemoryContext context
dlist_node * cur
Definition: ilist.h:161
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:742
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:126
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
Definition: regguts.h:298
size_t Size
Definition: c.h:356
dlist_head subtxns
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:33
#define Int32GetDatum(X)
Definition: postgres.h:485
static ReorderBufferChange * ReorderBufferIterTXNNext ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 957 of file reorderbuffer.c.

References Assert, binaryheap::bh_size, binaryheap_first(), binaryheap_remove_first(), binaryheap_replace_first(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, DatumGetInt32, DEBUG2, dlist_container, dlist_delete(), dlist_has_next(), dlist_head_element, dlist_is_empty(), dlist_next_node(), dlist_pop_head_node(), dlist_push_tail(), elog, ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::fd, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, next, ReorderBufferChange::node, NULL, ReorderBufferIterTXNState::old_change, ReorderBufferRestoreChanges(), ReorderBufferReturnChange(), ReorderBufferIterTXNEntry::segno, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferCommit().

958 {
959  ReorderBufferChange *change;
961  int32 off;
962 
963  /* nothing there anymore */
964  if (state->heap->bh_size == 0)
965  return NULL;
966 
967  off = DatumGetInt32(binaryheap_first(state->heap));
968  entry = &state->entries[off];
969 
970  /* free memory we might have "leaked" in the previous *Next call */
971  if (!dlist_is_empty(&state->old_change))
972  {
973  change = dlist_container(ReorderBufferChange, node,
975  ReorderBufferReturnChange(rb, change);
976  Assert(dlist_is_empty(&state->old_change));
977  }
978 
979  change = entry->change;
980 
981  /*
982  * update heap with information about which transaction has the next
983  * relevant change in LSN order
984  */
985 
986  /* there are in-memory changes */
987  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
988  {
989  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
990  ReorderBufferChange *next_change =
992 
993  /* txn stays the same */
994  state->entries[off].lsn = next_change->lsn;
995  state->entries[off].change = next_change;
996 
998  return change;
999  }
1000 
1001  /* try to load changes from disk */
1002  if (entry->txn->nentries != entry->txn->nentries_mem)
1003  {
1004  /*
1005  * Ugly: restoring changes will reuse *Change records, thus delete the
1006  * current one from the per-tx list and only free in the next call.
1007  */
1008  dlist_delete(&change->node);
1009  dlist_push_tail(&state->old_change, &change->node);
1010 
1011  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
1012  &state->entries[off].segno))
1013  {
1014  /* successfully restored changes from disk */
1015  ReorderBufferChange *next_change =
1017  &entry->txn->changes);
1018 
1019  elog(DEBUG2, "restored %u/%u changes from disk",
1020  (uint32) entry->txn->nentries_mem,
1021  (uint32) entry->txn->nentries);
1022 
1023  Assert(entry->txn->nentries_mem);
1024  /* txn stays the same */
1025  state->entries[off].lsn = next_change->lsn;
1026  state->entries[off].change = next_change;
1028 
1029  return change;
1030  }
1031  }
1032 
1033  /* ok, no changes there anymore, remove */
1034  binaryheap_remove_first(state->heap);
1035 
1036  return change;
1037 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
static int32 next
Definition: blutils.c:210
#define DatumGetInt32(X)
Definition: postgres.h:478
ReorderBufferTXN * txn
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void binaryheap_replace_first(binaryheap *heap, Datum d)
Definition: binaryheap.c:204
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, int *fd, XLogSegNo *segno)
signed int int32
Definition: c.h:256
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:421
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
Datum binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:159
#define DEBUG2
Definition: elog.h:24
unsigned int uint32
Definition: c.h:268
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
int bh_size
Definition: binaryheap.h:32
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
ReorderBufferChange * change
static bool dlist_has_next(dlist_head *head, dlist_node *node)
Definition: ilist.h:402
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define Int32GetDatum(X)
Definition: postgres.h:485
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
Datum binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:174
#define elog
Definition: elog.h:219
void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1824 of file reorderbuffer.c.

References InvalidTransactionId, NULL, and ReorderBufferTXNByXid().

Referenced by DecodeHeap2Op(), DecodeHeapOp(), DecodeLogicalMsgOp(), DecodeStandbyOp(), DecodeXactOp(), DecodeXLogOp(), and LogicalDecodingProcessRecord().

1825 {
1826  /* many records won't have an xid assigned, centralize check here */
1827  if (xid != InvalidTransactionId)
1828  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1829 }
#define InvalidTransactionId
Definition: transam.h:31
#define NULL
Definition: c.h:229
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change 
)

Definition at line 579 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, NULL, ReorderBufferCheckSerializeTXN(), and ReorderBufferTXNByXid().

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

581 {
582  ReorderBufferTXN *txn;
583 
584  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
585 
586  change->lsn = lsn;
587  Assert(InvalidXLogRecPtr != lsn);
588  dlist_push_tail(&txn->changes, &change->node);
589  txn->nentries++;
590  txn->nentries_mem++;
591 
593 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
dlist_head changes
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snapshot,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 599 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, NULL, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), and TeardownHistoricSnapshot().

Referenced by DecodeLogicalMsgOp().

603 {
604  if (transactional)
605  {
606  MemoryContext oldcontext;
607  ReorderBufferChange *change;
608 
610 
611  oldcontext = MemoryContextSwitchTo(rb->context);
612 
613  change = ReorderBufferGetChange(rb);
615  change->data.msg.prefix = pstrdup(prefix);
616  change->data.msg.message_size = message_size;
617  change->data.msg.message = palloc(message_size);
618  memcpy(change->data.msg.message, message, message_size);
619 
620  ReorderBufferQueueChange(rb, xid, lsn, change);
621 
622  MemoryContextSwitchTo(oldcontext);
623  }
624  else
625  {
626  ReorderBufferTXN *txn = NULL;
627  volatile Snapshot snapshot_now = snapshot;
628 
629  if (xid != InvalidTransactionId)
630  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
631 
632  /* setup snapshot to allow catalog access */
633  SetupHistoricSnapshot(snapshot_now, NULL);
634  PG_TRY();
635  {
636  rb->message(rb, txn, lsn, false, prefix, message_size, message);
637 
639  }
640  PG_CATCH();
641  {
643  PG_RE_THROW();
644  }
645  PG_END_TRY();
646  }
647 }
char * pstrdup(const char *in)
Definition: mcxt.c:1077
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:1965
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
union ReorderBufferChange::@86 data
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
struct ReorderBufferChange::@86::@88 msg
#define InvalidTransactionId
Definition: transam.h:31
ReorderBufferMessageCB message
MemoryContext context
#define PG_CATCH()
Definition: elog.h:293
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
#define PG_RE_THROW()
Definition: elog.h:314
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:849
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:1949
#define PG_TRY()
Definition: elog.h:284
#define PG_END_TRY()
Definition: elog.h:300
static void ReorderBufferRestoreChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
char *  change 
)
static

Definition at line 2437 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, ReorderBufferTXN::changes, ReorderBuffer::context, SnapshotData::copied, ReorderBufferChange::data, dlist_push_tail(), MemoryContextAlloc(), MemoryContextAllocZero(), ReorderBufferChange::msg, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, offsetof, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferGetChange(), ReorderBufferGetTupleBuf(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, ReorderBufferChange::tp, SnapshotData::xcnt, and SnapshotData::xip.

Referenced by ReorderBufferRestoreChanges().

2439 {
2440  ReorderBufferDiskChange *ondisk;
2441  ReorderBufferChange *change;
2442 
2443  ondisk = (ReorderBufferDiskChange *) data;
2444 
2445  change = ReorderBufferGetChange(rb);
2446 
2447  /* copy static part */
2448  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
2449 
2450  data += sizeof(ReorderBufferDiskChange);
2451 
2452  /* restore individual stuff */
2453  switch (change->action)
2454  {
2455  /* fall through these, they're all similar enough */
2460  if (change->data.tp.oldtuple)
2461  {
2462  uint32 tuplelen = ((HeapTuple) data)->t_len;
2463 
2464  change->data.tp.oldtuple =
2466 
2467  /* restore ->tuple */
2468  memcpy(&change->data.tp.oldtuple->tuple, data,
2469  sizeof(HeapTupleData));
2470  data += sizeof(HeapTupleData);
2471 
2472  /* reset t_data pointer into the new tuplebuf */
2473  change->data.tp.oldtuple->tuple.t_data =
2474  ReorderBufferTupleBufData(change->data.tp.oldtuple);
2475 
2476  /* restore tuple data itself */
2477  memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
2478  data += tuplelen;
2479  }
2480 
2481  if (change->data.tp.newtuple)
2482  {
2483  /* here, data might not be suitably aligned! */
2484  uint32 tuplelen;
2485 
2486  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
2487  sizeof(uint32));
2488 
2489  change->data.tp.newtuple =
2491 
2492  /* restore ->tuple */
2493  memcpy(&change->data.tp.newtuple->tuple, data,
2494  sizeof(HeapTupleData));
2495  data += sizeof(HeapTupleData);
2496 
2497  /* reset t_data pointer into the new tuplebuf */
2498  change->data.tp.newtuple->tuple.t_data =
2499  ReorderBufferTupleBufData(change->data.tp.newtuple);
2500 
2501  /* restore tuple data itself */
2502  memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
2503  data += tuplelen;
2504  }
2505 
2506  break;
2508  {
2509  Size prefix_size;
2510 
2511  /* read prefix */
2512  memcpy(&prefix_size, data, sizeof(Size));
2513  data += sizeof(Size);
2514  change->data.msg.prefix = MemoryContextAlloc(rb->context,
2515  prefix_size);
2516  memcpy(change->data.msg.prefix, data, prefix_size);
2517  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
2518  data += prefix_size;
2519 
2520  /* read the message */
2521  memcpy(&change->data.msg.message_size, data, sizeof(Size));
2522  data += sizeof(Size);
2523  change->data.msg.message = MemoryContextAlloc(rb->context,
2524  change->data.msg.message_size);
2525  memcpy(change->data.msg.message, data,
2526  change->data.msg.message_size);
2527  data += change->data.msg.message_size;
2528 
2529  break;
2530  }
2532  {
2533  Snapshot oldsnap;
2534  Snapshot newsnap;
2535  Size size;
2536 
2537  oldsnap = (Snapshot) data;
2538 
2539  size = sizeof(SnapshotData) +
2540  sizeof(TransactionId) * oldsnap->xcnt +
2541  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
2542 
2543  change->data.snapshot = MemoryContextAllocZero(rb->context, size);
2544 
2545  newsnap = change->data.snapshot;
2546 
2547  memcpy(newsnap, data, size);
2548  newsnap->xip = (TransactionId *)
2549  (((char *) newsnap) + sizeof(SnapshotData));
2550  newsnap->subxip = newsnap->xip + newsnap->xcnt;
2551  newsnap->copied = true;
2552  break;
2553  }
2554  /* the base struct contains all the data, easy peasy */
2558  break;
2559  }
2560 
2561  dlist_push_tail(&txn->changes, &change->node);
2562  txn->nentries_mem++;
2563 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:170
HeapTupleData * HeapTuple
Definition: htup.h:70
uint32 TransactionId
Definition: c.h:397
bool copied
Definition: snapshot.h:94
struct ReorderBufferChange::@86::@87 tp
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
struct SnapshotData * Snapshot
Definition: snapshot.h:23
union ReorderBufferChange::@86 data
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:36
struct ReorderBufferChange::@86::@88 msg
dlist_head changes
struct SnapshotData SnapshotData
unsigned int uint32
Definition: c.h:268
TransactionId * xip
Definition: snapshot.h:77
MemoryContext context
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:742
struct ReorderBufferDiskChange ReorderBufferDiskChange
#define Assert(condition)
Definition: c.h:675
size_t Size
Definition: c.h:356
uint32 xcnt
Definition: snapshot.h:78
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:707
ReorderBufferChange change
struct HeapTupleData HeapTupleData
#define offsetof(type, field)
Definition: c.h:555
TransactionId * subxip
Definition: snapshot.h:89
int32 subxcnt
Definition: snapshot.h:90
ReorderBufferTupleBuf * ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
static Size ReorderBufferRestoreChanges ( ReorderBuffer rb,
ReorderBufferTXN txn,
int *  fd,
XLogSegNo segno 
)
static

Definition at line 2300 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, cleanup(), CloseTransientFile(), dlist_mutable_iter::cur, ReplicationSlot::data, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, max_changes_in_memory, MAXPGPATH, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, OpenTransientFile(), ReorderBuffer::outbuf, PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, WAIT_EVENT_REORDER_BUFFER_READ, ReorderBufferTXN::xid, XLByteToSeg, and XLogSegNoOffsetToRecPtr.

Referenced by ReorderBufferIterTXNInit(), and ReorderBufferIterTXNNext().

2302 {
2303  Size restored = 0;
2304  XLogSegNo last_segno;
2305  dlist_mutable_iter cleanup_iter;
2306 
2309 
2310  /* free current entries, so we have memory for more */
2311  dlist_foreach_modify(cleanup_iter, &txn->changes)
2312  {
2314  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
2315 
2316  dlist_delete(&cleanup->node);
2317  ReorderBufferReturnChange(rb, cleanup);
2318  }
2319  txn->nentries_mem = 0;
2320  Assert(dlist_is_empty(&txn->changes));
2321 
2322  XLByteToSeg(txn->final_lsn, last_segno);
2323 
2324  while (restored < max_changes_in_memory && *segno <= last_segno)
2325  {
2326  int readBytes;
2327  ReorderBufferDiskChange *ondisk;
2328 
2329  if (*fd == -1)
2330  {
2331  XLogRecPtr recptr;
2332  char path[MAXPGPATH];
2333 
2334  /* first time in */
2335  if (*segno == 0)
2336  {
2337  XLByteToSeg(txn->first_lsn, *segno);
2338  }
2339 
2340  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2341  XLogSegNoOffsetToRecPtr(*segno, 0, recptr);
2342 
2343  /*
2344  * No need to care about TLIs here, only used during a single run,
2345  * so each LSN only maps to a specific WAL record.
2346  */
2347  sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2349  (uint32) (recptr >> 32), (uint32) recptr);
2350 
2351  *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
2352  if (*fd < 0 && errno == ENOENT)
2353  {
2354  *fd = -1;
2355  (*segno)++;
2356  continue;
2357  }
2358  else if (*fd < 0)
2359  ereport(ERROR,
2361  errmsg("could not open file \"%s\": %m",
2362  path)));
2363 
2364  }
2365 
2366  /*
2367  * Read the statically sized part of a change which has information
2368  * about the total size. If we couldn't read a record, we're at the
2369  * end of this file.
2370  */
2373  readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
2375 
2376  /* eof */
2377  if (readBytes == 0)
2378  {
2380  *fd = -1;
2381  (*segno)++;
2382  continue;
2383  }
2384  else if (readBytes < 0)
2385  ereport(ERROR,
2387  errmsg("could not read from reorderbuffer spill file: %m")));
2388  else if (readBytes != sizeof(ReorderBufferDiskChange))
2389  ereport(ERROR,
2391  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2392  readBytes,
2393  (uint32) sizeof(ReorderBufferDiskChange))));
2394 
2395  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2396 
2398  sizeof(ReorderBufferDiskChange) + ondisk->size);
2399  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2400 
2402  readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
2403  ondisk->size - sizeof(ReorderBufferDiskChange));
2405 
2406  if (readBytes < 0)
2407  ereport(ERROR,
2409  errmsg("could not read from reorderbuffer spill file: %m")));
2410  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
2411  ereport(ERROR,
2413  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2414  readBytes,
2415  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
2416 
2417  /*
2418  * ok, read a full change from disk, now restore it into proper
2419  * in-memory format
2420  */
2421  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
2422  restored++;
2423  }
2424 
2425  return restored;
2426 }
XLogRecPtr first_lsn
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
ReplicationSlotPersistentData data
Definition: slot.h:115
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1038
#define XLogSegNoOffsetToRecPtr(segno, offset, dest)
Definition: xlog_internal.h:95
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define ERROR
Definition: elog.h:43
dlist_head changes
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:34
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2107
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:268
XLogRecPtr final_lsn
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1205
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define ereport(elevel, rest)
Definition: elog.h:122
int CloseTransientFile(int fd)
Definition: fd.c:2268
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
static void cleanup(void)
Definition: bootstrap.c:848
TransactionId xid
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define XLByteToSeg(xlrp, logSegNo)
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
size_t Size
Definition: c.h:356
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1181
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define NameStr(name)
Definition: c.h:499
static const Size max_changes_in_memory
#define read(a, b, c)
Definition: win32.h:18
static void ReorderBufferRestoreCleanup ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2569 of file reorderbuffer.c.

References Assert, cur, ReplicationSlot::data, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, MAXPGPATH, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, unlink(), ReorderBufferTXN::xid, XLByteToSeg, and XLogSegNoOffsetToRecPtr.

Referenced by ReorderBufferCleanupTXN().

2570 {
2571  XLogSegNo first;
2572  XLogSegNo cur;
2573  XLogSegNo last;
2574 
2577 
2578  XLByteToSeg(txn->first_lsn, first);
2579  XLByteToSeg(txn->final_lsn, last);
2580 
2581  /* iterate over all possible filenames, and delete them */
2582  for (cur = first; cur <= last; cur++)
2583  {
2584  char path[MAXPGPATH];
2585  XLogRecPtr recptr;
2586 
2587  XLogSegNoOffsetToRecPtr(cur, 0, recptr);
2588 
2589  sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2591  (uint32) (recptr >> 32), (uint32) recptr);
2592  if (unlink(path) != 0 && errno != ENOENT)
2593  ereport(ERROR,
2595  errmsg("could not remove file \"%s\": %m", path)));
2596  }
2597 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
struct cursor * cur
Definition: ecpg.c:28
ReplicationSlotPersistentData data
Definition: slot.h:115
#define XLogSegNoOffsetToRecPtr(segno, offset, dest)
Definition: xlog_internal.h:95
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:34
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:268
XLogRecPtr final_lsn
int unlink(const char *filename)
#define ereport(elevel, rest)
Definition: elog.h:122
TransactionId xid
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define XLByteToSeg(xlrp, logSegNo)
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define NameStr(name)
Definition: c.h:499
void ReorderBufferReturnChange ( ReorderBuffer rb,
ReorderBufferChange change 
)

Definition at line 365 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::msg, NULL, pfree(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferFreeSnap(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, and ReorderBufferChange::tp.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferCommit(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), and ReorderBufferToastReset().

366 {
367  /* free contained data */
368  switch (change->action)
369  {
374  if (change->data.tp.newtuple)
375  {
376  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
377  change->data.tp.newtuple = NULL;
378  }
379 
380  if (change->data.tp.oldtuple)
381  {
382  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
383  change->data.tp.oldtuple = NULL;
384  }
385  break;
387  if (change->data.msg.prefix != NULL)
388  pfree(change->data.msg.prefix);
389  change->data.msg.prefix = NULL;
390  if (change->data.msg.message != NULL)
391  pfree(change->data.msg.message);
392  change->data.msg.message = NULL;
393  break;
395  if (change->data.snapshot)
396  {
397  ReorderBufferFreeSnap(rb, change->data.snapshot);
398  change->data.snapshot = NULL;
399  }
400  break;
401  /* no data in addition to the struct itself */
405  break;
406  }
407 
408  pfree(change);
409 }
struct ReorderBufferChange::@86::@87 tp
union ReorderBufferChange::@86 data
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
void pfree(void *pointer)
Definition: mcxt.c:950
struct ReorderBufferChange::@86::@88 msg
#define NULL
Definition: c.h:229
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
void ReorderBufferReturnTupleBuf ( ReorderBuffer rb,
ReorderBufferTupleBuf tuple 
)

Definition at line 470 of file reorderbuffer.c.

References ReorderBufferTupleBuf::alloc_tuple_size, ReorderBuffer::cached_tuplebufs, max_cached_tuplebufs, MaxHeapTupleSize, ReorderBufferTupleBuf::node, ReorderBuffer::nr_cached_tuplebufs, pfree(), slist_push_head(), HeapTupleData::t_data, ReorderBufferTupleBuf::tuple, VALGRIND_MAKE_MEM_DEFINED, and VALGRIND_MAKE_MEM_UNDEFINED.

Referenced by ReorderBufferReturnChange().

471 {
472  /* check whether to put into the slab cache, oversized tuples never are */
473  if (tuple->alloc_tuple_size == MaxHeapTupleSize &&
475  {
476  rb->nr_cached_tuplebufs++;
477  slist_push_head(&rb->cached_tuplebufs, &tuple->node);
480  VALGRIND_MAKE_MEM_DEFINED(&tuple->node, sizeof(tuple->node));
482  }
483  else
484  {
485  pfree(tuple);
486  }
487 }
#define VALGRIND_MAKE_MEM_DEFINED(addr, size)
Definition: memdebug.h:26
static const Size max_cached_tuplebufs
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
Definition: memdebug.h:28
static void slist_push_head(slist_head *head, slist_node *node)
Definition: ilist.h:574
HeapTupleHeader t_data
Definition: htup.h:67
void pfree(void *pointer)
Definition: mcxt.c:950
#define MaxHeapTupleSize
Definition: htup_details.h:561
HeapTupleData tuple
Definition: reorderbuffer.h:27
Size nr_cached_tuplebufs
slist_head cached_tuplebufs
static void ReorderBufferReturnTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 317 of file reorderbuffer.c.

References ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, hash_destroy(), ReorderBufferTXN::invalidations, InvalidTransactionId, NULL, pfree(), ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::xid.

Referenced by ReorderBufferCleanupTXN().

318 {
319  /* clean the lookup cache if we were cached (quite likely) */
320  if (rb->by_txn_last_xid == txn->xid)
321  {
323  rb->by_txn_last_txn = NULL;
324  }
325 
326  /* free data that's contained */
327 
328  if (txn->tuplecid_hash != NULL)
329  {
331  txn->tuplecid_hash = NULL;
332  }
333 
334  if (txn->invalidations)
335  {
336  pfree(txn->invalidations);
337  txn->invalidations = NULL;
338  }
339 
340  pfree(txn);
341 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:793
TransactionId by_txn_last_xid
void pfree(void *pointer)
Definition: mcxt.c:950
#define InvalidTransactionId
Definition: transam.h:31
ReorderBufferTXN * by_txn_last_txn
TransactionId xid
#define NULL
Definition: c.h:229
SharedInvalidationMessage * invalidations
static void ReorderBufferSerializeChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  fd,
ReorderBufferChange change 
)
static

Definition at line 2136 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, CloseTransientFile(), ReorderBufferChange::data, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferChange::msg, ReorderBuffer::outbuf, pgstat_report_wait_end(), pgstat_report_wait_start(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferTupleBuf::tuple, WAIT_EVENT_REORDER_BUFFER_WRITE, write, SnapshotData::xcnt, ReorderBufferTXN::xid, and SnapshotData::xip.

Referenced by ReorderBufferSerializeTXN().

2138 {
2139  ReorderBufferDiskChange *ondisk;
2140  Size sz = sizeof(ReorderBufferDiskChange);
2141 
2143 
2144  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2145  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
2146 
2147  switch (change->action)
2148  {
2149  /* fall through these, they're all similar enough */
2154  {
2155  char *data;
2156  ReorderBufferTupleBuf *oldtup,
2157  *newtup;
2158  Size oldlen = 0;
2159  Size newlen = 0;
2160 
2161  oldtup = change->data.tp.oldtuple;
2162  newtup = change->data.tp.newtuple;
2163 
2164  if (oldtup)
2165  {
2166  sz += sizeof(HeapTupleData);
2167  oldlen = oldtup->tuple.t_len;
2168  sz += oldlen;
2169  }
2170 
2171  if (newtup)
2172  {
2173  sz += sizeof(HeapTupleData);
2174  newlen = newtup->tuple.t_len;
2175  sz += newlen;
2176  }
2177 
2178  /* make sure we have enough space */
2180 
2181  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2182  /* might have been reallocated above */
2183  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2184 
2185  if (oldlen)
2186  {
2187  memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
2188  data += sizeof(HeapTupleData);
2189 
2190  memcpy(data, oldtup->tuple.t_data, oldlen);
2191  data += oldlen;
2192  }
2193 
2194  if (newlen)
2195  {
2196  memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
2197  data += sizeof(HeapTupleData);
2198 
2199  memcpy(data, newtup->tuple.t_data, newlen);
2200  data += newlen;
2201  }
2202  break;
2203  }
2205  {
2206  char *data;
2207  Size prefix_size = strlen(change->data.msg.prefix) + 1;
2208 
2209  sz += prefix_size + change->data.msg.message_size +
2210  sizeof(Size) + sizeof(Size);
2212 
2213  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2214 
2215  /* might have been reallocated above */
2216  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2217 
2218  /* write the prefix including the size */
2219  memcpy(data, &prefix_size, sizeof(Size));
2220  data += sizeof(Size);
2221  memcpy(data, change->data.msg.prefix,
2222  prefix_size);
2223  data += prefix_size;
2224 
2225  /* write the message including the size */
2226  memcpy(data, &change->data.msg.message_size, sizeof(Size));
2227  data += sizeof(Size);
2228  memcpy(data, change->data.msg.message,
2229  change->data.msg.message_size);
2230  data += change->data.msg.message_size;
2231 
2232  break;
2233  }
2235  {
2236  Snapshot snap;
2237  char *data;
2238 
2239  snap = change->data.snapshot;
2240 
2241  sz += sizeof(SnapshotData) +
2242  sizeof(TransactionId) * snap->xcnt +
2243  sizeof(TransactionId) * snap->subxcnt
2244  ;
2245 
2246  /* make sure we have enough space */
2248  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2249  /* might have been reallocated above */
2250  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2251 
2252  memcpy(data, snap, sizeof(SnapshotData));
2253  data += sizeof(SnapshotData);
2254 
2255  if (snap->xcnt)
2256  {
2257  memcpy(data, snap->xip,
2258  sizeof(TransactionId) * snap->xcnt);
2259  data += sizeof(TransactionId) * snap->xcnt;
2260  }
2261 
2262  if (snap->subxcnt)
2263  {
2264  memcpy(data, snap->subxip,
2265  sizeof(TransactionId) * snap->subxcnt);
2266  data += sizeof(TransactionId) * snap->subxcnt;
2267  }
2268  break;
2269  }
2273  /* ReorderBufferChange contains everything important */
2274  break;
2275  }
2276 
2277  ondisk->size = sz;
2278 
2280  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
2281  {
2282  int save_errno = errno;
2283 
2285  errno = save_errno;
2286  ereport(ERROR,
2288  errmsg("could not write to data file for XID %u: %m",
2289  txn->xid)));
2290  }
2292 
2293  Assert(ondisk->change.action == change->action);
2294 }
uint32 TransactionId
Definition: c.h:397
struct ReorderBufferChange::@86::@87 tp
#define write(a, b, c)
Definition: win32.h:19
union ReorderBufferChange::@86 data
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
static int fd(const char *x, int i)
Definition: preproc-init.c:105
HeapTupleHeader t_data
Definition: htup.h:67
#define ERROR
Definition: elog.h:43
struct ReorderBufferChange::@86::@88 msg
uint32 t_len
Definition: htup.h:64
int errcode_for_file_access(void)
Definition: elog.c:598
HeapTupleData tuple
Definition: reorderbuffer.h:27
struct SnapshotData SnapshotData
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1205
#define ereport(elevel, rest)
Definition: elog.h:122
TransactionId * xip
Definition: snapshot.h:77
int CloseTransientFile(int fd)
Definition: fd.c:2268
TransactionId xid
struct ReorderBufferDiskChange ReorderBufferDiskChange
#define Assert(condition)
Definition: c.h:675
size_t Size
Definition: c.h:356
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1181
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
uint32 xcnt
Definition: snapshot.h:78
int errmsg(const char *fmt,...)
Definition: elog.c:797
ReorderBufferChange change
struct HeapTupleData HeapTupleData
TransactionId * subxip
Definition: snapshot.h:89
int32 subxcnt
Definition: snapshot.h:90
static void ReorderBufferSerializeReserve ( ReorderBuffer rb,
Size  sz 
)
static

Definition at line 2020 of file reorderbuffer.c.

References ReorderBuffer::context, MemoryContextAlloc(), ReorderBuffer::outbuf, ReorderBuffer::outbufsize, and repalloc().

Referenced by ReorderBufferRestoreChanges(), and ReorderBufferSerializeChange().

2021 {
2022  if (!rb->outbufsize)
2023  {
2024  rb->outbuf = MemoryContextAlloc(rb->context, sz);
2025  rb->outbufsize = sz;
2026  }
2027  else if (rb->outbufsize < sz)
2028  {
2029  rb->outbuf = repalloc(rb->outbuf, sz);
2030  rb->outbufsize = sz;
2031  }
2032 }
MemoryContext context
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:963
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:707
static void ReorderBufferSerializeTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2055 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, CloseTransientFile(), dlist_iter::cur, dlist_mutable_iter::cur, ReplicationSlot::data, DEBUG2, dlist_container, dlist_delete(), dlist_foreach, dlist_foreach_modify, dlist_is_empty(), elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferChange::lsn, MAXPGPATH, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, OpenTransientFile(), PG_BINARY, ReorderBufferReturnChange(), ReorderBufferSerializeChange(), ReorderBufferTXN::subtxns, ReorderBufferTXN::xid, XLByteInSeg, XLByteToSeg, and XLogSegNoOffsetToRecPtr.

Referenced by ReorderBufferCheckSerializeTXN(), and ReorderBufferIterTXNInit().

2056 {
2057  dlist_iter subtxn_i;
2058  dlist_mutable_iter change_i;
2059  int fd = -1;
2060  XLogSegNo curOpenSegNo = 0;
2061  Size spilled = 0;
2062  char path[MAXPGPATH];
2063 
2064  elog(DEBUG2, "spill %u changes in XID %u to disk",
2065  (uint32) txn->nentries_mem, txn->xid);
2066 
2067  /* do the same to all child TXs */
2068  dlist_foreach(subtxn_i, &txn->subtxns)
2069  {
2070  ReorderBufferTXN *subtxn;
2071 
2072  subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
2073  ReorderBufferSerializeTXN(rb, subtxn);
2074  }
2075 
2076  /* serialize changestream */
2077  dlist_foreach_modify(change_i, &txn->changes)
2078  {
2079  ReorderBufferChange *change;
2080 
2081  change = dlist_container(ReorderBufferChange, node, change_i.cur);
2082 
2083  /*
2084  * store in segment in which it belongs by start lsn, don't split over
2085  * multiple segments tho
2086  */
2087  if (fd == -1 || !XLByteInSeg(change->lsn, curOpenSegNo))
2088  {
2089  XLogRecPtr recptr;
2090 
2091  if (fd != -1)
2092  CloseTransientFile(fd);
2093 
2094  XLByteToSeg(change->lsn, curOpenSegNo);
2095  XLogSegNoOffsetToRecPtr(curOpenSegNo, 0, recptr);
2096 
2097  /*
2098  * No need to care about TLIs here, only used during a single run,
2099  * so each LSN only maps to a specific WAL record.
2100  */
2101  sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2103  (uint32) (recptr >> 32), (uint32) recptr);
2104 
2105  /* open segment, create it if necessary */
2106  fd = OpenTransientFile(path,
2107  O_CREAT | O_WRONLY | O_APPEND | PG_BINARY,
2108  S_IRUSR | S_IWUSR);
2109 
2110  if (fd < 0)
2111  ereport(ERROR,
2113  errmsg("could not open file \"%s\": %m",
2114  path)));
2115  }
2116 
2117  ReorderBufferSerializeChange(rb, txn, fd, change);
2118  dlist_delete(&change->node);
2119  ReorderBufferReturnChange(rb, change);
2120 
2121  spilled++;
2122  }
2123 
2124  Assert(spilled == txn->nentries_mem);
2125  Assert(dlist_is_empty(&txn->changes));
2126  txn->nentries_mem = 0;
2127 
2128  if (fd != -1)
2129  CloseTransientFile(fd);
2130 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
ReplicationSlotPersistentData data
Definition: slot.h:115
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1038
#define XLogSegNoOffsetToRecPtr(segno, offset, dest)
Definition: xlog_internal.h:95
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define ERROR
Definition: elog.h:43
dlist_head changes
#define MAXPGPATH
#define DEBUG2
Definition: elog.h:24
uint64 XLogSegNo
Definition: xlogdefs.h:34
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2107
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:268
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define ereport(elevel, rest)
Definition: elog.h:122
int CloseTransientFile(int fd)
Definition: fd.c:2268
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
dlist_node * cur
Definition: ilist.h:161
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define XLByteToSeg(xlrp, logSegNo)
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
size_t Size
Definition: c.h:356
dlist_head subtxns
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define NameStr(name)
Definition: c.h:499
#define elog
Definition: elog.h:219
#define XLByteInSeg(xlrp, logSegNo)
void ReorderBufferSetBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 1857 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, NULL, and ReorderBufferTXNByXid().

Referenced by SnapBuildCommitTxn(), and SnapBuildProcessChange().

1859 {
1860  ReorderBufferTXN *txn;
1861  bool is_new;
1862 
1863  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
1864  Assert(txn->base_snapshot == NULL);
1865  Assert(snap != NULL);
1866 
1867  txn->base_snapshot = snap;
1868  txn->base_snapshot_lsn = lsn;
1869 }
Snapshot base_snapshot
XLogRecPtr base_snapshot_lsn
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferSetRestartPoint ( ReorderBuffer rb,
XLogRecPtr  ptr 
)

Definition at line 694 of file reorderbuffer.c.

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

695 {
697 }
XLogRecPtr current_restart_decoding_lsn
static void ReorderBufferToastAppendChunk ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 2691 of file reorderbuffer.c.

References Assert, ReorderBufferToastEnt::chunk_id, ReorderBufferToastEnt::chunks, ReorderBufferChange::data, DatumGetInt32, DatumGetObjectId, DatumGetPointer, dlist_init(), dlist_push_tail(), elog, ERROR, fastgetattr, HASH_ENTER, hash_search(), IsToastRelation(), ReorderBufferToastEnt::last_chunk_seq, ReorderBufferChange::node, NULL, ReorderBufferToastEnt::num_chunks, ReorderBufferToastEnt::reconstructed, RelationGetDescr, ReorderBufferToastInitHash(), ReorderBufferToastEnt::size, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, ReorderBufferTupleBuf::tuple, VARATT_IS_EXTENDED, VARATT_IS_SHORT, VARHDRSZ, VARHDRSZ_SHORT, VARSIZE, and VARSIZE_SHORT.

Referenced by ReorderBufferCommit().

2693 {
2694  ReorderBufferToastEnt *ent;
2695  ReorderBufferTupleBuf *newtup;
2696  bool found;
2697  int32 chunksize;
2698  bool isnull;
2699  Pointer chunk;
2700  TupleDesc desc = RelationGetDescr(relation);
2701  Oid chunk_id;
2702  int32 chunk_seq;
2703 
2704  if (txn->toast_hash == NULL)
2705  ReorderBufferToastInitHash(rb, txn);
2706 
2707  Assert(IsToastRelation(relation));
2708 
2709  newtup = change->data.tp.newtuple;
2710  chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
2711  Assert(!isnull);
2712  chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
2713  Assert(!isnull);
2714 
2715  ent = (ReorderBufferToastEnt *)
2716  hash_search(txn->toast_hash,
2717  (void *) &chunk_id,
2718  HASH_ENTER,
2719  &found);
2720 
2721  if (!found)
2722  {
2723  Assert(ent->chunk_id == chunk_id);
2724  ent->num_chunks = 0;
2725  ent->last_chunk_seq = 0;
2726  ent->size = 0;
2727  ent->reconstructed = NULL;
2728  dlist_init(&ent->chunks);
2729 
2730  if (chunk_seq != 0)
2731  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
2732  chunk_seq, chunk_id);
2733  }
2734  else if (found && chunk_seq != ent->last_chunk_seq + 1)
2735  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
2736  chunk_seq, chunk_id, ent->last_chunk_seq + 1);
2737 
2738  chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
2739  Assert(!isnull);
2740 
2741  /* calculate size so we can allocate the right size at once later */
2742  if (!VARATT_IS_EXTENDED(chunk))
2743  chunksize = VARSIZE(chunk) - VARHDRSZ;
2744  else if (VARATT_IS_SHORT(chunk))
2745  /* could happen due to heap_form_tuple doing its thing */
2746  chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
2747  else
2748  elog(ERROR, "unexpected type of toast chunk");
2749 
2750  ent->size += chunksize;
2751  ent->last_chunk_seq = chunk_seq;
2752  ent->num_chunks++;
2753  dlist_push_tail(&ent->chunks, &change->node);
2754 }
bool IsToastRelation(Relation relation)
Definition: catalog.c:135
#define fastgetattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:719
#define DatumGetInt32(X)
Definition: postgres.h:478
#define RelationGetDescr(relation)
Definition: rel.h:429
struct ReorderBufferChange::@86::@87 tp
#define VARHDRSZ_SHORT
Definition: postgres.h:269
#define VARSIZE(PTR)
Definition: postgres.h:304
#define VARHDRSZ
Definition: c.h:445
#define DatumGetObjectId(X)
Definition: postgres.h:506
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:885
unsigned int Oid
Definition: postgres_ext.h:31
union ReorderBufferChange::@86 data
signed int int32
Definition: c.h:256
char * Pointer
Definition: c.h:245
#define ERROR
Definition: elog.h:43
#define VARATT_IS_SHORT(PTR)
Definition: postgres.h:325
struct varlena * reconstructed
HeapTupleData tuple
Definition: reorderbuffer.h:27
#define VARSIZE_SHORT(PTR)
Definition: postgres.h:306
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
#define VARATT_IS_EXTENDED(PTR)
Definition: postgres.h:326
#define DatumGetPointer(X)
Definition: postgres.h:555
#define elog
Definition: elog.h:219
static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferToastInitHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2670 of file reorderbuffer.c.

References Assert, ReorderBuffer::context, HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, NULL, and ReorderBufferTXN::toast_hash.

Referenced by ReorderBufferToastAppendChunk().

2671 {
2672  HASHCTL hash_ctl;
2673 
2674  Assert(txn->toast_hash == NULL);
2675 
2676  memset(&hash_ctl, 0, sizeof(hash_ctl));
2677  hash_ctl.keysize = sizeof(Oid);
2678  hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
2679  hash_ctl.hcxt = rb->context;
2680  txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
2682 }
struct ReorderBufferToastEnt ReorderBufferToastEnt
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
Size entrysize
Definition: hsearch.h:73
unsigned int Oid
Definition: postgres_ext.h:31
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext context
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:301
Size keysize
Definition: hsearch.h:72
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
static void ReorderBufferToastReplace ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 2764 of file reorderbuffer.c.

References Assert, tupleDesc::attrs, ReorderBufferToastEnt::chunks, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, DatumGetPointer, dlist_container, dlist_foreach, fastgetattr, free, HASH_FIND, hash_search(), heap_deform_tuple(), heap_form_tuple(), INDIRECT_POINTER_SIZE, MaxHeapTupleSize, MemoryContextSwitchTo(), tupleDesc::natts, NULL, palloc0(), pfree(), varatt_indirect::pointer, PointerGetDatum, RelationData::rd_rel, ReorderBufferToastEnt::reconstructed, RelationClose(), RelationGetDescr, RelationIdGetRelation(), ReorderBufferTupleBufData, SET_VARSIZE, SET_VARSIZE_COMPRESSED, SET_VARTAG_EXTERNAL, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, ReorderBufferTupleBuf::tuple, varatt_external::va_extsize, varatt_external::va_rawsize, varatt_external::va_valueid, VARATT_EXTERNAL_GET_POINTER, VARATT_EXTERNAL_IS_COMPRESSED, VARATT_IS_EXTERNAL, VARATT_IS_SHORT, VARDATA, VARDATA_EXTERNAL, VARHDRSZ, VARSIZE, and VARTAG_INDIRECT.

Referenced by ReorderBufferCommit().

2766 {
2767  TupleDesc desc;
2768  int natt;
2769  Datum *attrs;
2770  bool *isnull;
2771  bool *free;
2772  HeapTuple tmphtup;
2773  Relation toast_rel;
2774  TupleDesc toast_desc;
2775  MemoryContext oldcontext;
2776  ReorderBufferTupleBuf *newtup;
2777 
2778  /* no toast tuples changed */
2779  if (txn->toast_hash == NULL)
2780  return;
2781 
2782  oldcontext = MemoryContextSwitchTo(rb->context);
2783 
2784  /* we should only have toast tuples in an INSERT or UPDATE */
2785  Assert(change->data.tp.newtuple);
2786 
2787  desc = RelationGetDescr(relation);
2788 
2789  toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
2790  toast_desc = RelationGetDescr(toast_rel);
2791 
2792  /* should we allocate from stack instead? */
2793  attrs = palloc0(sizeof(Datum) * desc->natts);
2794  isnull = palloc0(sizeof(bool) * desc->natts);
2795  free = palloc0(sizeof(bool) * desc->natts);
2796 
2797  newtup = change->data.tp.newtuple;
2798 
2799  heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
2800 
2801  for (natt = 0; natt < desc->natts; natt++)
2802  {
2803  Form_pg_attribute attr = desc->attrs[natt];
2804  ReorderBufferToastEnt *ent;
2805  struct varlena *varlena;
2806 
2807  /* va_rawsize is the size of the original datum -- including header */
2808  struct varatt_external toast_pointer;
2809  struct varatt_indirect redirect_pointer;
2810  struct varlena *new_datum = NULL;
2811  struct varlena *reconstructed;
2812  dlist_iter it;
2813  Size data_done = 0;
2814 
2815  /* system columns aren't toasted */
2816  if (attr->attnum < 0)
2817  continue;
2818 
2819  if (attr->attisdropped)
2820  continue;
2821 
2822  /* not a varlena datatype */
2823  if (attr->attlen != -1)
2824  continue;
2825 
2826  /* no data */
2827  if (isnull[natt])
2828  continue;
2829 
2830  /* ok, we know we have a toast datum */
2831  varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
2832 
2833  /* no need to do anything if the tuple isn't external */
2834  if (!VARATT_IS_EXTERNAL(varlena))
2835  continue;
2836 
2837  VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
2838 
2839  /*
2840  * Check whether the toast tuple changed, replace if so.
2841  */
2842  ent = (ReorderBufferToastEnt *)
2843  hash_search(txn->toast_hash,
2844  (void *) &toast_pointer.va_valueid,
2845  HASH_FIND,
2846  NULL);
2847  if (ent == NULL)
2848  continue;
2849 
2850  new_datum =
2851  (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
2852 
2853  free[natt] = true;
2854 
2855  reconstructed = palloc0(toast_pointer.va_rawsize);
2856 
2857  ent->reconstructed = reconstructed;
2858 
2859  /* stitch toast tuple back together from its parts */
2860  dlist_foreach(it, &ent->chunks)
2861  {
2862  bool isnull;
2863  ReorderBufferChange *cchange;
2864  ReorderBufferTupleBuf *ctup;
2865  Pointer chunk;
2866 
2867  cchange = dlist_container(ReorderBufferChange, node, it.cur);
2868  ctup = cchange->data.tp.newtuple;
2869  chunk = DatumGetPointer(
2870  fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
2871 
2872  Assert(!isnull);
2873  Assert(!VARATT_IS_EXTERNAL(chunk));
2874  Assert(!VARATT_IS_SHORT(chunk));
2875 
2876  memcpy(VARDATA(reconstructed) + data_done,
2877  VARDATA(chunk),
2878  VARSIZE(chunk) - VARHDRSZ);
2879  data_done += VARSIZE(chunk) - VARHDRSZ;
2880  }
2881  Assert(data_done == toast_pointer.va_extsize);
2882 
2883  /* make sure its marked as compressed or not */
2884  if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
2885  SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
2886  else
2887  SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
2888 
2889  memset(&redirect_pointer, 0, sizeof(redirect_pointer));
2890  redirect_pointer.pointer = reconstructed;
2891 
2893  memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
2894  sizeof(redirect_pointer));
2895 
2896  attrs[natt] = PointerGetDatum(new_datum);
2897  }
2898 
2899  /*
2900  * Build tuple in separate memory & copy tuple back into the tuplebuf
2901  * passed to the output plugin. We can't directly heap_fill_tuple() into
2902  * the tuplebuf because attrs[] will point back into the current content.
2903  */
2904  tmphtup = heap_form_tuple(desc, attrs, isnull);
2905  Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
2906  Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
2907 
2908  memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
2909  newtup->tuple.t_len = tmphtup->t_len;
2910 
2911  /*
2912  * free resources we won't further need, more persistent stuff will be
2913  * free'd in ReorderBufferToastReset().
2914  */
2915  RelationClose(toast_rel);
2916  pfree(tmphtup);
2917  for (natt = 0; natt < desc->natts; natt++)
2918  {
2919  if (free[natt])
2920  pfree(DatumGetPointer(attrs[natt]));
2921  }
2922  pfree(attrs);
2923  pfree(free);
2924  pfree(isnull);
2925 
2926  MemoryContextSwitchTo(oldcontext);
2927 }
#define VARDATA(PTR)
Definition: postgres.h:303
#define fastgetattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:719
#define RelationGetDescr(relation)
Definition: rel.h:429
#define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)
Definition: tuptoaster.h:111
struct ReorderBufferChange::@86::@87 tp
#define VARSIZE(PTR)
Definition: postgres.h:304
#define PointerGetDatum(X)
Definition: postgres.h:562
#define VARHDRSZ
Definition: c.h:445
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
Form_pg_attribute * attrs
Definition: tupdesc.h:74
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:692
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:885
Form_pg_class rd_rel
Definition: rel.h:114
union ReorderBufferChange::@86 data
#define VARDATA_EXTERNAL(PTR)
Definition: postgres.h:311
int natts
Definition: tupdesc.h:73
HeapTupleHeader t_data
Definition: htup.h:67
#define VARATT_IS_EXTERNAL(PTR)
Definition: postgres.h:314
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:36
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:950
char * Pointer
Definition: c.h:245
#define VARATT_IS_SHORT(PTR)
Definition: postgres.h:325
uint32 t_len
Definition: htup.h:64
#define MaxHeapTupleSize
Definition: htup_details.h:561
struct varlena * reconstructed
#define SET_VARTAG_EXTERNAL(PTR, tag)
Definition: postgres.h:332
HeapTupleData tuple
Definition: reorderbuffer.h:27
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:184
void RelationClose(Relation relation)
Definition: relcache.c:2156
#define INDIRECT_POINTER_SIZE
Definition: tuptoaster.h:102
MemoryContext context
void * palloc0(Size size)
Definition: mcxt.c:878
uintptr_t Datum
Definition: postgres.h:372
dlist_node * cur
Definition: ilist.h:161
#define free(a)
Definition: header.h:65
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
Definition: tuptoaster.h:121
size_t Size
Definition: c.h:356
#define DatumGetPointer(X)
Definition: postgres.h:555
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:935
#define SET_VARSIZE_COMPRESSED(PTR, len)
Definition: postgres.h:330
Definition: c.h:439
#define SET_VARSIZE(PTR, len)
Definition: postgres.h:328
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2067
static void ReorderBufferToastReset ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2933 of file reorderbuffer.c.

References ReorderBufferToastEnt::chunks, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, hash_destroy(), hash_seq_init(), hash_seq_search(), ReorderBufferChange::node, NULL, pfree(), ReorderBufferToastEnt::reconstructed, ReorderBufferReturnChange(), and ReorderBufferTXN::toast_hash.

Referenced by ReorderBufferCommit().

2934 {
2935  HASH_SEQ_STATUS hstat;
2936  ReorderBufferToastEnt *ent;
2937 
2938  if (txn->toast_hash == NULL)
2939  return;
2940 
2941  /* sequentially walk over the hash and free everything */
2942  hash_seq_init(&hstat, txn->toast_hash);
2943  while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
2944  {
2945  dlist_mutable_iter it;
2946 
2947  if (ent->reconstructed != NULL)
2948  pfree(ent->reconstructed);
2949 
2950  dlist_foreach_modify(it, &ent->chunks)
2951  {
2952  ReorderBufferChange *change =
2954 
2955  dlist_delete(&change->node);
2956  ReorderBufferReturnChange(rb, change);
2957  }
2958  }
2959 
2960  hash_destroy(txn->toast_hash);
2961  txn->toast_hash = NULL;
2962 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:793
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:950
struct varlena * reconstructed
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define NULL
Definition: c.h:229
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1353
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1343
static ReorderBufferTXN * ReorderBufferTXNByXid ( ReorderBuffer rb,
TransactionId  xid,
bool  create,
bool is_new,
XLogRecPtr  lsn,
bool  create_as_top 
)
static

Definition at line 496 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::current_restart_decoding_lsn, dlist_push_tail(), ReorderBufferTXN::first_lsn, HASH_ENTER, HASH_FIND, hash_search(), InvalidXLogRecPtr, ReorderBufferTXN::node, NULL, ReorderBufferGetTXN(), ReorderBufferTXN::restart_decoding_lsn, ReorderBuffer::toplevel_by_lsn, TransactionIdIsValid, ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAddInvalidations(), ReorderBufferAddNewTupleCids(), ReorderBufferAssignChild(), ReorderBufferCommit(), ReorderBufferCommitChild(), ReorderBufferForget(), ReorderBufferProcessXid(), ReorderBufferQueueChange(), ReorderBufferQueueMessage(), ReorderBufferSetBaseSnapshot(), ReorderBufferXidHasBaseSnapshot(), ReorderBufferXidHasCatalogChanges(), and ReorderBufferXidSetCatalogChanges().

498 {
499  ReorderBufferTXN *txn;
501  bool found;
502 
504  Assert(!create || lsn != InvalidXLogRecPtr);
505 
506  /*
507  * Check the one-entry lookup cache first
508  */
510  rb->by_txn_last_xid == xid)
511  {
512  txn = rb->by_txn_last_txn;
513 
514  if (txn != NULL)
515  {
516  /* found it, and it's valid */
517  if (is_new)
518  *is_new = false;
519  return txn;
520  }
521 
522  /*
523  * cached as non-existent, and asked not to create? Then nothing else
524  * to do.
525  */
526  if (!create)
527  return NULL;
528  /* otherwise fall through to create it */
529  }
530 
531  /*
532  * If the cache wasn't hit or it yielded an "does-not-exist" and we want
533  * to create an entry.
534  */
535 
536  /* search the lookup table */
537  ent = (ReorderBufferTXNByIdEnt *)
538  hash_search(rb->by_txn,
539  (void *) &xid,
540  create ? HASH_ENTER : HASH_FIND,
541  &found);
542  if (found)
543  txn = ent->txn;
544  else if (create)
545  {
546  /* initialize the new entry, if creation was requested */
547  Assert(ent != NULL);
548 
549  ent->txn = ReorderBufferGetTXN(rb);
550  ent->txn->xid = xid;
551  txn = ent->txn;
552  txn->first_lsn = lsn;
554 
555  if (create_as_top)
556  {
557  dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
558  AssertTXNLsnOrder(rb);
559  }
560  }
561  else
562  txn = NULL; /* not found and not asked to create */
563 
564  /* update cache */
565  rb->by_txn_last_xid = xid;
566  rb->by_txn_last_txn = txn;
567 
568  if (is_new)
569  *is_new = !found;
570 
571  Assert(!create || txn != NULL);
572  return txn;
573 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
static ReorderBufferTXN * ReorderBufferGetTXN(ReorderBuffer *rb)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:885
ReorderBufferTXN * by_txn_last_txn
dlist_head toplevel_by_lsn
TransactionId xid
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
ReorderBufferTXN * txn
Definition: reorderbuffer.c:82
XLogRecPtr restart_decoding_lsn
#define TransactionIdIsValid(xid)
Definition: transam.h:41
bool ReorderBufferXidHasBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 1990 of file reorderbuffer.c.

References ReorderBufferTXN::base_snapshot, InvalidXLogRecPtr, NULL, and ReorderBufferTXNByXid().

Referenced by SnapBuildCommitTxn(), SnapBuildDistributeNewCatalogSnapshot(), and SnapBuildProcessChange().

1991 {
1992  ReorderBufferTXN *txn;
1993 
1994  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1995  false);
1996 
1997  /* transaction isn't known yet, ergo no snapshot */
1998  if (txn == NULL)
1999  return false;
2000 
2001  /*
2002  * TODO: It would be a nice improvement if we would check the toplevel
2003  * transaction in subtransactions, but we'd need to keep track of a bit
2004  * more state.
2005  */
2006  return txn->base_snapshot != NULL;
2007 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
#define NULL
Definition: c.h:229
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
bool ReorderBufferXidHasCatalogChanges ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 1974 of file reorderbuffer.c.

References ReorderBufferTXN::has_catalog_changes, InvalidXLogRecPtr, NULL, and ReorderBufferTXNByXid().

Referenced by SnapBuildCommitTxn().

1975 {
1976  ReorderBufferTXN *txn;
1977 
1978  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1979  false);
1980  if (txn == NULL)
1981  return false;
1982 
1983  return txn->has_catalog_changes;
1984 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define NULL
Definition: c.h:229
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferXidSetCatalogChanges ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1959 of file reorderbuffer.c.

References ReorderBufferTXN::has_catalog_changes, NULL, and ReorderBufferTXNByXid().

Referenced by DecodeCommit(), DecodeHeapOp(), and SnapBuildProcessNewCid().

1961 {
1962  ReorderBufferTXN *txn;
1963 
1964  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1965 
1966  txn->has_catalog_changes = true;
1967 }
#define NULL
Definition: c.h:229
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
bool ResolveCminCmaxDuringDecoding ( HTAB tuplecid_data,
Snapshot  snapshot,
HeapTuple  htup,
Buffer  buffer,
CommandId cmin,
CommandId cmax 
)

Definition at line 3241 of file reorderbuffer.c.

References Assert, BufferGetTag(), BufferIsLocal, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, HASH_FIND, hash_search(), ItemPointerCopy, ItemPointerGetBlockNumber, MAIN_FORKNUM, NULL, ReorderBufferTupleCidKey::relnode, HeapTupleData::t_self, HeapTupleData::t_tableOid, ReorderBufferTupleCidKey::tid, and UpdateLogicalMappings().

Referenced by HeapTupleSatisfiesHistoricMVCC().

3245 {
3248  ForkNumber forkno;
3249  BlockNumber blockno;
3250  bool updated_mapping = false;
3251 
3252  /* be careful about padding */
3253  memset(&key, 0, sizeof(key));
3254 
3256 
3257  /*
3258  * get relfilenode from the buffer, no convenient way to access it other
3259  * than that.
3260  */
3261  BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
3262 
3263  /* tuples can only be in the main fork */
3264  Assert(forkno == MAIN_FORKNUM);
3265  Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
3266 
3267  ItemPointerCopy(&htup->t_self,
3268  &key.tid);
3269 
3270 restart:
3271  ent = (ReorderBufferTupleCidEnt *)
3272  hash_search(tuplecid_data,
3273  (void *) &key,
3274  HASH_FIND,
3275  NULL);
3276 
3277  /*
3278  * failed to find a mapping, check whether the table was rewritten and
3279  * apply mapping if so, but only do that once - there can be no new
3280  * mappings while we are in here since we have to hold a lock on the
3281  * relation.
3282  */
3283  if (ent == NULL && !updated_mapping)
3284  {
3285  UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
3286  /* now check but don't update for a mapping again */
3287  updated_mapping = true;
3288  goto restart;
3289  }
3290  else if (ent == NULL)
3291  return false;
3292 
3293  if (cmin)
3294  *cmin = ent->cmin;
3295  if (cmax)
3296  *cmax = ent->cmax;
3297  return true;
3298 }
uint32 BlockNumber
Definition: block.h:31
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:885
ItemPointerData t_self
Definition: htup.h:65
Oid t_tableOid
Definition: htup.h:66
ForkNumber
Definition: relpath.h:24
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:207
#define BufferIsLocal(buffer)
Definition: buf.h:37
static void UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
#define ItemPointerGetBlockNumber(pointer)
Definition: itemptr.h:66
void BufferGetTag(Buffer buffer, RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
Definition: bufmgr.c:2626
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:120
void StartupReorderBuffer ( void  )

Definition at line 2604 of file reorderbuffer.c.

References AllocateDir(), dirent::d_name, DEBUG2, ereport, errcode_for_file_access(), errmsg(), FreeDir(), lstat, MAXPGPATH, NULL, PANIC, ReadDir(), ReplicationSlotValidateName(), and unlink().

Referenced by StartupXLOG().

2605 {
2606  DIR *logical_dir;
2607  struct dirent *logical_de;
2608 
2609  DIR *spill_dir;
2610  struct dirent *spill_de;
2611 
2612  logical_dir = AllocateDir("pg_replslot");
2613  while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
2614  {
2615  struct stat statbuf;
2616  char path[MAXPGPATH];
2617 
2618  if (strcmp(logical_de->d_name, ".") == 0 ||
2619  strcmp(logical_de->d_name, "..") == 0)
2620  continue;
2621 
2622  /* if it cannot be a slot, skip the directory */
2623  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
2624  continue;
2625 
2626  /*
2627  * ok, has to be a surviving logical slot, iterate and delete
2628  * everything starting with xid-*
2629  */
2630  sprintf(path, "pg_replslot/%s", logical_de->d_name);
2631 
2632  /* we're only creating directories here, skip if it's not our's */
2633  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2634  continue;
2635 
2636  spill_dir = AllocateDir(path);
2637  while ((spill_de = ReadDir(spill_dir, path)) != NULL)
2638  {
2639  if (strcmp(spill_de->d_name, ".") == 0 ||
2640  strcmp(spill_de->d_name, "..") == 0)
2641  continue;
2642 
2643  /* only look at names that can be ours */
2644  if (strncmp(spill_de->d_name, "xid", 3) == 0)
2645  {
2646  sprintf(path, "pg_replslot/%s/%s", logical_de->d_name,
2647  spill_de->d_name);
2648 
2649  if (unlink(path) != 0)
2650  ereport(PANIC,
2652  errmsg("could not remove file \"%s\": %m",
2653  path)));
2654  }
2655  }
2656  FreeDir(spill_dir);
2657  }
2658  FreeDir(logical_dir);
2659 }
Definition: dirent.h:9
#define PANIC
Definition: elog.h:53
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:173
Definition: dirent.c:25
#define MAXPGPATH
#define DEBUG2
Definition: elog.h:24
int errcode_for_file_access(void)
Definition: elog.c:598
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2298
int unlink(const char *filename)
#define ereport(elevel, rest)
Definition: elog.h:122
#define NULL
Definition: c.h:229
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2364
int errmsg(const char *fmt,...)
Definition: elog.c:797
char d_name[MAX_PATH]
Definition: dirent.h:14
#define lstat(path, sb)
Definition: win32.h:272
int FreeDir(DIR *dir)
Definition: fd.c:2407
static bool TransactionIdInArray ( TransactionId  xid,
TransactionId xip,
Size  num 
)
static

Definition at line 3125 of file reorderbuffer.c.

References NULL, and xidComparator().

Referenced by UpdateLogicalMappings().

3126 {
3127  return bsearch(&xid, xip, num,
3128  sizeof(TransactionId), xidComparator) != NULL;
3129 }
uint32 TransactionId
Definition: c.h:397
#define NULL
Definition: c.h:229
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:138
static void UpdateLogicalMappings ( HTAB tuplecid_data,
Oid  relid,
Snapshot  snapshot 
)
static

Definition at line 3152 of file reorderbuffer.c.

References AllocateDir(), ApplyLogicalMappingFile(), dirent::d_name, DEBUG1, elog, ERROR, file_sort_by_lsn(), RewriteMappingFile::fname, FreeDir(), InvalidOid, IsSharedRelation(), lappend(), lfirst, list_length(), LOGICAL_REWRITE_FORMAT, RewriteMappingFile::lsn, MyDatabaseId, NIL, NULL, palloc(), pfree(), qsort, ReadDir(), SnapshotData::subxcnt, SnapshotData::subxip, TransactionIdDidCommit(), and TransactionIdInArray().

Referenced by ResolveCminCmaxDuringDecoding().

3153 {
3154  DIR *mapping_dir;
3155  struct dirent *mapping_de;
3156  List *files = NIL;
3157  ListCell *file;
3158  RewriteMappingFile **files_a;
3159  size_t off;
3160  Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
3161 
3162  mapping_dir = AllocateDir("pg_logical/mappings");
3163  while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
3164  {
3165  Oid f_dboid;
3166  Oid f_relid;
3167  TransactionId f_mapped_xid;
3168  TransactionId f_create_xid;
3169  XLogRecPtr f_lsn;
3170  uint32 f_hi,
3171  f