PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
reorderbuffer.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/tuptoaster.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/combocid.h"
#include "utils/memdebug.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenodemap.h"
#include "utils/tqual.h"
Include dependency graph for reorderbuffer.c:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTXNByIdEnt
 
struct  ReorderBufferTupleCidKey
 
struct  ReorderBufferTupleCidEnt
 
struct  ReorderBufferIterTXNEntry
 
struct  ReorderBufferIterTXNState
 
struct  ReorderBufferToastEnt
 
struct  ReorderBufferDiskChange
 
struct  RewriteMappingFile
 

Typedefs

typedef struct
ReorderBufferTXNByIdEnt 
ReorderBufferTXNByIdEnt
 
typedef struct
ReorderBufferTupleCidKey 
ReorderBufferTupleCidKey
 
typedef struct
ReorderBufferTupleCidEnt 
ReorderBufferTupleCidEnt
 
typedef struct
ReorderBufferIterTXNEntry 
ReorderBufferIterTXNEntry
 
typedef struct
ReorderBufferIterTXNState 
ReorderBufferIterTXNState
 
typedef struct
ReorderBufferToastEnt 
ReorderBufferToastEnt
 
typedef struct
ReorderBufferDiskChange 
ReorderBufferDiskChange
 
typedef struct RewriteMappingFile RewriteMappingFile
 

Functions

static ReorderBufferTXNReorderBufferGetTXN (ReorderBuffer *rb)
 
static void ReorderBufferReturnTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferTXNReorderBufferTXNByXid (ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
 
static void AssertTXNLsnOrder (ReorderBuffer *rb)
 
static ReorderBufferIterTXNStateReorderBufferIterTXNInit (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferChangeReorderBufferIterTXNNext (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferIterTXNFinish (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferExecuteInvalidations (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferCheckSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeChange (ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
 
static Size ReorderBufferRestoreChanges (ReorderBuffer *rb, ReorderBufferTXN *txn, int *fd, XLogSegNo *segno)
 
static void ReorderBufferRestoreChange (ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
 
static void ReorderBufferRestoreCleanup (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferFreeSnap (ReorderBuffer *rb, Snapshot snap)
 
static Snapshot ReorderBufferCopySnap (ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
 
static void ReorderBufferToastInitHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReset (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReplace (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void ReorderBufferToastAppendChunk (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *rb)
 
void ReorderBufferReturnChange (ReorderBuffer *rb, ReorderBufferChange *change)
 
ReorderBufferTupleBufReorderBufferGetTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
static int ReorderBufferIterCompare (Datum a, Datum b, void *arg)
 
static void ReorderBufferCleanupTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferBuildTupleCidHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
static void ReorderBufferSerializeReserve (ReorderBuffer *rb, Size sz)
 
void StartupReorderBuffer (void)
 
static void ApplyLogicalMappingFile (HTAB *tuplecid_data, Oid relid, const char *fname)
 
static bool TransactionIdInArray (TransactionId xid, TransactionId *xip, Size num)
 
static int file_sort_by_lsn (const void *a_p, const void *b_p)
 
static void UpdateLogicalMappings (HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
 
bool ResolveCminCmaxDuringDecoding (HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
 

Variables

static const Size max_changes_in_memory = 4096
 
static const Size max_cached_tuplebufs = 4096 * 2
 

Typedef Documentation

Function Documentation

static void ApplyLogicalMappingFile ( HTAB tuplecid_data,
Oid  relid,
const char *  fname 
)
static

Definition at line 3032 of file reorderbuffer.c.

References Assert, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), HASH_ENTER, HASH_FIND, hash_search(), InvalidCommandId, ItemPointerCopy, MAXPGPATH, LogicalRewriteMappingData::new_node, LogicalRewriteMappingData::new_tid, NULL, LogicalRewriteMappingData::old_node, LogicalRewriteMappingData::old_tid, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, ReorderBufferTupleCidKey::relnode, ReorderBufferTupleCidKey::tid, and WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ.

Referenced by UpdateLogicalMappings().

3033 {
3034  char path[MAXPGPATH];
3035  int fd;
3036  int readBytes;
3038 
3039  sprintf(path, "pg_logical/mappings/%s", fname);
3040  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
3041  if (fd < 0)
3042  ereport(ERROR,
3044  errmsg("could not open file \"%s\": %m", path)));
3045 
3046  while (true)
3047  {
3050  ReorderBufferTupleCidEnt *new_ent;
3051  bool found;
3052 
3053  /* be careful about padding */
3054  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
3055 
3056  /* read all mappings till the end of the file */
3058  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
3060 
3061  if (readBytes < 0)
3062  ereport(ERROR,
3064  errmsg("could not read file \"%s\": %m",
3065  path)));
3066  else if (readBytes == 0) /* EOF */
3067  break;
3068  else if (readBytes != sizeof(LogicalRewriteMappingData))
3069  ereport(ERROR,
3071  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
3072  path, readBytes,
3073  (int32) sizeof(LogicalRewriteMappingData))));
3074 
3075  key.relnode = map.old_node;
3076  ItemPointerCopy(&map.old_tid,
3077  &key.tid);
3078 
3079 
3080  ent = (ReorderBufferTupleCidEnt *)
3081  hash_search(tuplecid_data,
3082  (void *) &key,
3083  HASH_FIND,
3084  NULL);
3085 
3086  /* no existing mapping, no need to update */
3087  if (!ent)
3088  continue;
3089 
3090  key.relnode = map.new_node;
3091  ItemPointerCopy(&map.new_tid,
3092  &key.tid);
3093 
3094  new_ent = (ReorderBufferTupleCidEnt *)
3095  hash_search(tuplecid_data,
3096  (void *) &key,
3097  HASH_ENTER,
3098  &found);
3099 
3100  if (found)
3101  {
3102  /*
3103  * Make sure the existing mapping makes sense. We sometime update
3104  * old records that did not yet have a cmax (e.g. pg_class' own
3105  * entry while rewriting it) during rewrites, so allow that.
3106  */
3107  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
3108  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
3109  }
3110  else
3111  {
3112  /* update mapping */
3113  new_ent->cmin = ent->cmin;
3114  new_ent->cmax = ent->cmax;
3115  new_ent->combocid = ent->combocid;
3116  }
3117  }
3118 }
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:902
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1039
signed int int32
Definition: c.h:256
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2144
int errcode_for_file_access(void)
Definition: elog.c:598
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1235
#define ereport(elevel, rest)
Definition: elog.h:122
#define InvalidCommandId
Definition: c.h:414
ItemPointerData new_tid
Definition: rewriteheap.h:40
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:676
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1211
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define read(a, b, c)
Definition: win32.h:13
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:139
ItemPointerData old_tid
Definition: rewriteheap.h:39
static void AssertTXNLsnOrder ( ReorderBuffer rb)
static

Definition at line 650 of file reorderbuffer.c.

References Assert, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, and ReorderBuffer::toplevel_by_lsn.

Referenced by ReorderBufferGetOldestTXN(), and ReorderBufferTXNByXid().

651 {
652 #ifdef USE_ASSERT_CHECKING
653  dlist_iter iter;
654  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
655 
656  dlist_foreach(iter, &rb->toplevel_by_lsn)
657  {
658  ReorderBufferTXN *cur_txn;
659 
660  cur_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
661  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
662 
663  if (cur_txn->end_lsn != InvalidXLogRecPtr)
664  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
665 
666  if (prev_first_lsn != InvalidXLogRecPtr)
667  Assert(prev_first_lsn < cur_txn->first_lsn);
668 
669  Assert(!cur_txn->is_known_as_subxact);
670  prev_first_lsn = cur_txn->first_lsn;
671  }
672 #endif
673 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head toplevel_by_lsn
dlist_node * cur
Definition: ilist.h:161
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:676
XLogRecPtr end_lsn
static int file_sort_by_lsn ( const void *  a_p,
const void *  b_p 
)
static

Definition at line 3135 of file reorderbuffer.c.

References RewriteMappingFile::lsn.

Referenced by UpdateLogicalMappings().

3136 {
3137  RewriteMappingFile *a = *(RewriteMappingFile **) a_p;
3138  RewriteMappingFile *b = *(RewriteMappingFile **) b_p;
3139 
3140  if (a->lsn < b->lsn)
3141  return -1;
3142  else if (a->lsn > b->lsn)
3143  return 1;
3144  return 0;
3145 }
void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1682 of file reorderbuffer.c.

References ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, NULL, ReorderBufferCleanupTXN(), and ReorderBufferTXNByXid().

Referenced by DecodeAbort().

1683 {
1684  ReorderBufferTXN *txn;
1685 
1686  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1687  false);
1688 
1689  /* unknown, nothing to remove */
1690  if (txn == NULL)
1691  return;
1692 
1693  /* cosmetic... */
1694  txn->final_lsn = lsn;
1695 
1696  /* remove potential on-disk data, and deallocate */
1697  ReorderBufferCleanupTXN(rb, txn);
1698 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define NULL
Definition: c.h:229
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)

Definition at line 1708 of file reorderbuffer.c.

References dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, elog, ReorderBufferCleanupTXN(), ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), and ReorderBufferTXN::xid.

Referenced by DecodeStandbyOp().

1709 {
1710  dlist_mutable_iter it;
1711 
1712  /*
1713  * Iterate through all (potential) toplevel TXNs and abort all that are
1714  * older than what possibly can be running. Once we've found the first
1715  * that is alive we stop, there might be some that acquired an xid earlier
1716  * but started writing later, but it's unlikely and they will cleaned up
1717  * in a later call to ReorderBufferAbortOld().
1718  */
1720  {
1721  ReorderBufferTXN *txn;
1722 
1723  txn = dlist_container(ReorderBufferTXN, node, it.cur);
1724 
1725  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1726  {
1727  elog(DEBUG2, "aborting old transaction %u", txn->xid);
1728 
1729  /* remove potential on-disk data, and deallocate this tx */
1730  ReorderBufferCleanupTXN(rb, txn);
1731  }
1732  else
1733  return;
1734  }
1735 }
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define DEBUG2
Definition: elog.h:24
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
dlist_head toplevel_by_lsn
TransactionId xid
#define elog
Definition: elog.h:219
void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 1920 of file reorderbuffer.c.

References Assert, ReorderBuffer::context, elog, ERROR, ReorderBufferTXN::invalidations, MemoryContextAlloc(), ReorderBufferTXN::ninvalidations, NULL, and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

1923 {
1924  ReorderBufferTXN *txn;
1925 
1926  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1927 
1928  if (txn->ninvalidations != 0)
1929  elog(ERROR, "only ever add one set of invalidations");
1930 
1931  Assert(nmsgs > 0);
1932 
1933  txn->ninvalidations = nmsgs;
1936  sizeof(SharedInvalidationMessage) * nmsgs);
1937  memcpy(txn->invalidations, msgs,
1938  sizeof(SharedInvalidationMessage) * nmsgs);
1939 }
#define ERROR
Definition: elog.h:43
MemoryContext context
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:676
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:707
#define elog
Definition: elog.h:219
void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

Definition at line 1876 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::command_id, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, ReorderBufferGetChange(), and ReorderBufferQueueChange().

Referenced by SnapBuildProcessNewCid().

1878 {
1880 
1881  change->data.command_id = cid;
1883 
1884  ReorderBufferQueueChange(rb, xid, lsn, change);
1885 }
union ReorderBufferChange::@93 data
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileNode  node,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 1892 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, NULL, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tuplecid, and ReorderBufferTXN::tuplecids.

Referenced by SnapBuildProcessNewCid().

1896 {
1898  ReorderBufferTXN *txn;
1899 
1900  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1901 
1902  change->data.tuplecid.node = node;
1903  change->data.tuplecid.tid = tid;
1904  change->data.tuplecid.cmin = cmin;
1905  change->data.tuplecid.cmax = cmax;
1906  change->data.tuplecid.combocid = combocid;
1907  change->lsn = lsn;
1909 
1910  dlist_push_tail(&txn->tuplecids, &change->node);
1911  txn->ntuplecids++;
1912 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
union ReorderBufferChange::@93 data
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
struct ReorderBufferChange::@93::@96 tuplecid
#define NULL
Definition: c.h:229
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
dlist_head tuplecids
void ReorderBufferAddSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 1836 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, ReorderBufferGetChange(), ReorderBufferQueueChange(), and ReorderBufferChange::snapshot.

Referenced by SnapBuildDistributeNewCatalogSnapshot().

1838 {
1840 
1841  change->data.snapshot = snap;
1843 
1844  ReorderBufferQueueChange(rb, xid, lsn, change);
1845 }
union ReorderBufferChange::@93 data
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
ReorderBuffer* ReorderBufferAllocate ( void  )

Definition at line 223 of file reorderbuffer.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate(), buffer, ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::cached_tuplebufs, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, dlist_init(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), ReorderBuffer::nr_cached_tuplebufs, NULL, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, SLAB_DEFAULT_BLOCK_SIZE, SlabContextCreate(), slist_init(), ReorderBuffer::toplevel_by_lsn, and ReorderBuffer::txn_context.

Referenced by StartupDecodingContext().

224 {
226  HASHCTL hash_ctl;
227  MemoryContext new_ctx;
228 
229  /* allocate memory in own context, to have better accountability */
231  "ReorderBuffer",
233 
234  buffer =
235  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
236 
237  memset(&hash_ctl, 0, sizeof(hash_ctl));
238 
239  buffer->context = new_ctx;
240 
241  buffer->change_context = SlabContextCreate(new_ctx,
242  "Change",
244  sizeof(ReorderBufferChange));
245 
246  buffer->txn_context = SlabContextCreate(new_ctx,
247  "TXN",
249  sizeof(ReorderBufferTXN));
250 
251  hash_ctl.keysize = sizeof(TransactionId);
252  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
253  hash_ctl.hcxt = buffer->context;
254 
255  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
257 
259  buffer->by_txn_last_txn = NULL;
260 
261  buffer->nr_cached_tuplebufs = 0;
262 
263  buffer->outbuf = NULL;
264  buffer->outbufsize = 0;
265 
267 
268  dlist_init(&buffer->toplevel_by_lsn);
269  slist_init(&buffer->cached_tuplebufs);
270 
271  return buffer;
272 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
uint32 TransactionId
Definition: c.h:397
MemoryContext hcxt
Definition: hsearch.h:78
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
Size entrysize
Definition: hsearch.h:73
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:183
MemoryContext change_context
static void slist_init(slist_head *head)
Definition: ilist.h:554
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
#define InvalidTransactionId
Definition: transam.h:31
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext context
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
ReorderBufferTXN * by_txn_last_txn
dlist_head toplevel_by_lsn
Size keysize
Definition: hsearch.h:72
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
#define NULL
Definition: c.h:229
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:214
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:194
Size nr_cached_tuplebufs
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:707
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
MemoryContext txn_context
slist_head cached_tuplebufs
void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 699 of file reorderbuffer.c.

References Assert, dlist_delete(), dlist_push_tail(), elog, ERROR, ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, ReorderBufferTXNByXid(), and ReorderBufferTXN::subtxns.

Referenced by DecodeXactOp().

701 {
702  ReorderBufferTXN *txn;
703  ReorderBufferTXN *subtxn;
704  bool new_top;
705  bool new_sub;
706 
707  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
708  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
709 
710  if (new_sub)
711  {
712  /*
713  * we assign subtransactions to top level transaction even if we don't
714  * have data for it yet, assignment records frequently reference xids
715  * that have not yet produced any records. Knowing those aren't top
716  * level xids allows us to make processing cheaper in some places.
717  */
718  dlist_push_tail(&txn->subtxns, &subtxn->node);
719  txn->nsubtxns++;
720  }
721  else if (!subtxn->is_known_as_subxact)
722  {
723  subtxn->is_known_as_subxact = true;
724  Assert(subtxn->nsubtxns == 0);
725 
726  /* remove from lsn order list of top-level transactions */
727  dlist_delete(&subtxn->node);
728 
729  /* add to toplevel transaction */
730  dlist_push_tail(&txn->subtxns, &subtxn->node);
731  txn->nsubtxns++;
732  }
733  else if (new_top)
734  {
735  elog(ERROR, "existing subxact assigned to unknown toplevel xact");
736  }
737 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
#define ERROR
Definition: elog.h:43
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define Assert(condition)
Definition: c.h:676
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define elog
Definition: elog.h:219
static void ReorderBufferBuildTupleCidHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1156 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, dlist_container, dlist_foreach, dlist_is_empty(), HASHCTL::entrysize, ReorderBufferTXN::has_catalog_changes, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FIND, hash_search(), HASHCTL::hcxt, InvalidCommandId, ItemPointerCopy, HASHCTL::keysize, ReorderBufferTXN::ntuplecids, ReorderBufferTupleCidKey::relnode, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferTupleCidKey::tid, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::tuplecids.

Referenced by ReorderBufferCommit().

1157 {
1158  dlist_iter iter;
1159  HASHCTL hash_ctl;
1160 
1161  if (!txn->has_catalog_changes || dlist_is_empty(&txn->tuplecids))
1162  return;
1163 
1164  memset(&hash_ctl, 0, sizeof(hash_ctl));
1165 
1166  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1167  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1168  hash_ctl.hcxt = rb->context;
1169 
1170  /*
1171  * create the hash with the exact number of to-be-stored tuplecids from
1172  * the start
1173  */
1174  txn->tuplecid_hash =
1175  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1177 
1178  dlist_foreach(iter, &txn->tuplecids)
1179  {
1182  bool found;
1183  ReorderBufferChange *change;
1184 
1185  change = dlist_container(ReorderBufferChange, node, iter.cur);
1186 
1188 
1189  /* be careful about padding */
1190  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1191 
1192  key.relnode = change->data.tuplecid.node;
1193 
1194  ItemPointerCopy(&change->data.tuplecid.tid,
1195  &key.tid);
1196 
1197  ent = (ReorderBufferTupleCidEnt *)
1199  (void *) &key,
1201  &found);
1202  if (!found)
1203  {
1204  ent->cmin = change->data.tuplecid.cmin;
1205  ent->cmax = change->data.tuplecid.cmax;
1206  ent->combocid = change->data.tuplecid.combocid;
1207  }
1208  else
1209  {
1210  Assert(ent->cmin == change->data.tuplecid.cmin);
1211  Assert(ent->cmax == InvalidCommandId ||
1212  ent->cmax == change->data.tuplecid.cmax);
1213 
1214  /*
1215  * if the tuple got valid in this transaction and now got deleted
1216  * we already have a valid cmin stored. The cmax will be
1217  * InvalidCommandId though.
1218  */
1219  ent->cmax = change->data.tuplecid.cmax;
1220  }
1221  }
1222 }
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
union ReorderBufferChange::@93 data
Size entrysize
Definition: hsearch.h:73
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:902
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext context
#define InvalidCommandId
Definition: c.h:414
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
struct ReorderBufferChange::@93::@96 tuplecid
Size keysize
Definition: hsearch.h:72
dlist_node * cur
Definition: ilist.h:161
#define Assert(condition)
Definition: c.h:676
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
dlist_head tuplecids
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:139
static void ReorderBufferCheckSerializeTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2037 of file reorderbuffer.c.

References Assert, max_changes_in_memory, ReorderBufferTXN::nentries_mem, and ReorderBufferSerializeTXN().

Referenced by ReorderBufferQueueChange().

2038 {
2039  /*
2040  * TODO: improve accounting so we cheaply can take subtransactions into
2041  * account here.
2042  */
2043  if (txn->nentries_mem >= max_changes_in_memory)
2044  {
2045  ReorderBufferSerializeTXN(rb, txn);
2046  Assert(txn->nentries_mem == 0);
2047  }
2048 }
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define Assert(condition)
Definition: c.h:676
static const Size max_changes_in_memory
static void ReorderBufferCleanupTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1073 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBuffer::by_txn, ReorderBufferTXN::changes, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, HASH_REMOVE, hash_search(), InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, NULL, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferRestoreCleanup(), ReorderBufferReturnChange(), ReorderBufferReturnTXN(), ReorderBufferTXN::serialized, SnapBuildSnapDecRefcount(), ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAbortOld(), ReorderBufferCommit(), and ReorderBufferForget().

1074 {
1075  bool found;
1076  dlist_mutable_iter iter;
1077 
1078  /* cleanup subtransactions & their changes */
1079  dlist_foreach_modify(iter, &txn->subtxns)
1080  {
1081  ReorderBufferTXN *subtxn;
1082 
1083  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1084 
1085  /*
1086  * Subtransactions are always associated to the toplevel TXN, even if
1087  * they originally were happening inside another subtxn, so we won't
1088  * ever recurse more than one level deep here.
1089  */
1090  Assert(subtxn->is_known_as_subxact);
1091  Assert(subtxn->nsubtxns == 0);
1092 
1093  ReorderBufferCleanupTXN(rb, subtxn);
1094  }
1095 
1096  /* cleanup changes in the toplevel txn */
1097  dlist_foreach_modify(iter, &txn->changes)
1098  {
1099  ReorderBufferChange *change;
1100 
1101  change = dlist_container(ReorderBufferChange, node, iter.cur);
1102 
1103  ReorderBufferReturnChange(rb, change);
1104  }
1105 
1106  /*
1107  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1108  * They are always stored in the toplevel transaction.
1109  */
1110  dlist_foreach_modify(iter, &txn->tuplecids)
1111  {
1112  ReorderBufferChange *change;
1113 
1114  change = dlist_container(ReorderBufferChange, node, iter.cur);
1116  ReorderBufferReturnChange(rb, change);
1117  }
1118 
1119  if (txn->base_snapshot != NULL)
1120  {
1122  txn->base_snapshot = NULL;
1124  }
1125 
1126  /*
1127  * Remove TXN from its containing list.
1128  *
1129  * Note: if txn->is_known_as_subxact, we are deleting the TXN from its
1130  * parent's list of known subxacts; this leaves the parent's nsubxacts
1131  * count too high, but we don't care. Otherwise, we are deleting the TXN
1132  * from the LSN-ordered list of toplevel TXNs.
1133  */
1134  dlist_delete(&txn->node);
1135 
1136  /* now remove reference from buffer */
1137  hash_search(rb->by_txn,
1138  (void *) &txn->xid,
1139  HASH_REMOVE,
1140  &found);
1141  Assert(found);
1142 
1143  /* remove entries spilled to disk */
1144  if (txn->serialized)
1145  ReorderBufferRestoreCleanup(rb, txn);
1146 
1147  /* deallocate */
1148  ReorderBufferReturnTXN(rb, txn);
1149 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:902
XLogRecPtr base_snapshot_lsn
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:676
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:434
dlist_head subtxns
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
dlist_head tuplecids
void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 1311 of file reorderbuffer.c.

References AbortCurrentTransaction(), ReorderBufferChange::action, ReorderBuffer::apply_change, Assert, ReorderBufferTXN::base_snapshot, ReorderBuffer::begin, BeginInternalSubTransaction(), ReorderBufferChange::command_id, ReorderBuffer::commit, ReorderBufferTXN::commit_time, SnapshotData::copied, SnapshotData::curcid, ReorderBufferChange::data, dlist_delete(), elog, ReorderBufferTXN::end_lsn, ERROR, ReorderBufferTXN::final_lsn, FirstCommandId, GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, ReorderBuffer::message, ReorderBufferChange::msg, ReorderBufferTXN::ninvalidations, ReorderBufferChange::node, NULL, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelidByRelfilenode(), RELKIND_SEQUENCE, relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferReturnChange(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTXNByXid(), RollbackAndReleaseCurrentSubTransaction(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, StartTransactionCommand(), TeardownHistoricSnapshot(), ReorderBufferChange::tp, and ReorderBufferTXN::tuplecid_hash.

Referenced by DecodeCommit().

1315 {
1316  ReorderBufferTXN *txn;
1317  volatile Snapshot snapshot_now;
1318  volatile CommandId command_id = FirstCommandId;
1319  bool using_subtxn;
1320  ReorderBufferIterTXNState *volatile iterstate = NULL;
1321 
1322  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1323  false);
1324 
1325  /* unknown transaction, nothing to replay */
1326  if (txn == NULL)
1327  return;
1328 
1329  txn->final_lsn = commit_lsn;
1330  txn->end_lsn = end_lsn;
1331  txn->commit_time = commit_time;
1332  txn->origin_id = origin_id;
1333  txn->origin_lsn = origin_lsn;
1334 
1335  /*
1336  * If this transaction didn't have any real changes in our database, it's
1337  * OK not to have a snapshot. Note that ReorderBufferCommitChild will have
1338  * transferred its snapshot to this transaction if it had one and the
1339  * toplevel tx didn't.
1340  */
1341  if (txn->base_snapshot == NULL)
1342  {
1343  Assert(txn->ninvalidations == 0);
1344  ReorderBufferCleanupTXN(rb, txn);
1345  return;
1346  }
1347 
1348  snapshot_now = txn->base_snapshot;
1349 
1350  /* build data to be able to lookup the CommandIds of catalog tuples */
1352 
1353  /* setup the initial snapshot */
1354  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1355 
1356  /*
1357  * Decoding needs access to syscaches et al., which in turn use
1358  * heavyweight locks and such. Thus we need to have enough state around to
1359  * keep track of those. The easiest way is to simply use a transaction
1360  * internally. That also allows us to easily enforce that nothing writes
1361  * to the database by checking for xid assignments.
1362  *
1363  * When we're called via the SQL SRF there's already a transaction
1364  * started, so start an explicit subtransaction there.
1365  */
1366  using_subtxn = IsTransactionOrTransactionBlock();
1367 
1368  PG_TRY();
1369  {
1370  ReorderBufferChange *change;
1371  ReorderBufferChange *specinsert = NULL;
1372 
1373  if (using_subtxn)
1374  BeginInternalSubTransaction("replay");
1375  else
1377 
1378  rb->begin(rb, txn);
1379 
1380  iterstate = ReorderBufferIterTXNInit(rb, txn);
1381  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1382  {
1383  Relation relation = NULL;
1384  Oid reloid;
1385 
1386  switch (change->action)
1387  {
1389 
1390  /*
1391  * Confirmation for speculative insertion arrived. Simply
1392  * use as a normal record. It'll be cleaned up at the end
1393  * of INSERT processing.
1394  */
1395  Assert(specinsert->data.tp.oldtuple == NULL);
1396  change = specinsert;
1398 
1399  /* intentionally fall through */
1403  Assert(snapshot_now);
1404 
1405  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1406  change->data.tp.relnode.relNode);
1407 
1408  /*
1409  * Catalog tuple without data, emitted while catalog was
1410  * in the process of being rewritten.
1411  */
1412  if (reloid == InvalidOid &&
1413  change->data.tp.newtuple == NULL &&
1414  change->data.tp.oldtuple == NULL)
1415  goto change_done;
1416  else if (reloid == InvalidOid)
1417  elog(ERROR, "could not map filenode \"%s\" to relation OID",
1418  relpathperm(change->data.tp.relnode,
1419  MAIN_FORKNUM));
1420 
1421  relation = RelationIdGetRelation(reloid);
1422 
1423  if (relation == NULL)
1424  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1425  reloid,
1426  relpathperm(change->data.tp.relnode,
1427  MAIN_FORKNUM));
1428 
1429  if (!RelationIsLogicallyLogged(relation))
1430  goto change_done;
1431 
1432  /*
1433  * For now ignore sequence changes entirely. Most of the
1434  * time they don't log changes using records we
1435  * understand, so it doesn't make sense to handle the few
1436  * cases we do.
1437  */
1438  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1439  goto change_done;
1440 
1441  /* user-triggered change */
1442  if (!IsToastRelation(relation))
1443  {
1444  ReorderBufferToastReplace(rb, txn, relation, change);
1445  rb->apply_change(rb, txn, relation, change);
1446 
1447  /*
1448  * Only clear reassembled toast chunks if we're sure
1449  * they're not required anymore. The creator of the
1450  * tuple tells us.
1451  */
1452  if (change->data.tp.clear_toast_afterwards)
1453  ReorderBufferToastReset(rb, txn);
1454  }
1455  /* we're not interested in toast deletions */
1456  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1457  {
1458  /*
1459  * Need to reassemble the full toasted Datum in
1460  * memory, to ensure the chunks don't get reused till
1461  * we're done remove it from the list of this
1462  * transaction's changes. Otherwise it will get
1463  * freed/reused while restoring spooled data from
1464  * disk.
1465  */
1466  dlist_delete(&change->node);
1467  ReorderBufferToastAppendChunk(rb, txn, relation,
1468  change);
1469  }
1470 
1471  change_done:
1472 
1473  /*
1474  * Either speculative insertion was confirmed, or it was
1475  * unsuccessful and the record isn't needed anymore.
1476  */
1477  if (specinsert != NULL)
1478  {
1479  ReorderBufferReturnChange(rb, specinsert);
1480  specinsert = NULL;
1481  }
1482 
1483  if (relation != NULL)
1484  {
1485  RelationClose(relation);
1486  relation = NULL;
1487  }
1488  break;
1489 
1491 
1492  /*
1493  * Speculative insertions are dealt with by delaying the
1494  * processing of the insert until the confirmation record
1495  * arrives. For that we simply unlink the record from the
1496  * chain, so it does not get freed/reused while restoring
1497  * spooled data from disk.
1498  *
1499  * This is safe in the face of concurrent catalog changes
1500  * because the relevant relation can't be changed between
1501  * speculative insertion and confirmation due to
1502  * CheckTableNotInUse() and locking.
1503  */
1504 
1505  /* clear out a pending (and thus failed) speculation */
1506  if (specinsert != NULL)
1507  {
1508  ReorderBufferReturnChange(rb, specinsert);
1509  specinsert = NULL;
1510  }
1511 
1512  /* and memorize the pending insertion */
1513  dlist_delete(&change->node);
1514  specinsert = change;
1515  break;
1516 
1518  rb->message(rb, txn, change->lsn, true,
1519  change->data.msg.prefix,
1520  change->data.msg.message_size,
1521  change->data.msg.message);
1522  break;
1523 
1525  /* get rid of the old */
1526  TeardownHistoricSnapshot(false);
1527 
1528  if (snapshot_now->copied)
1529  {
1530  ReorderBufferFreeSnap(rb, snapshot_now);
1531  snapshot_now =
1532  ReorderBufferCopySnap(rb, change->data.snapshot,
1533  txn, command_id);
1534  }
1535 
1536  /*
1537  * Restored from disk, need to be careful not to double
1538  * free. We could introduce refcounting for that, but for
1539  * now this seems infrequent enough not to care.
1540  */
1541  else if (change->data.snapshot->copied)
1542  {
1543  snapshot_now =
1544  ReorderBufferCopySnap(rb, change->data.snapshot,
1545  txn, command_id);
1546  }
1547  else
1548  {
1549  snapshot_now = change->data.snapshot;
1550  }
1551 
1552 
1553  /* and continue with the new one */
1554  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1555  break;
1556 
1558  Assert(change->data.command_id != InvalidCommandId);
1559 
1560  if (command_id < change->data.command_id)
1561  {
1562  command_id = change->data.command_id;
1563 
1564  if (!snapshot_now->copied)
1565  {
1566  /* we don't use the global one anymore */
1567  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1568  txn, command_id);
1569  }
1570 
1571  snapshot_now->curcid = command_id;
1572 
1573  TeardownHistoricSnapshot(false);
1574  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1575 
1576  /*
1577  * Every time the CommandId is incremented, we could
1578  * see new catalog contents, so execute all
1579  * invalidations.
1580  */
1582  }
1583 
1584  break;
1585 
1587  elog(ERROR, "tuplecid value in changequeue");
1588  break;
1589  }
1590  }
1591 
1592  /*
1593  * There's a speculative insertion remaining, just clean in up, it
1594  * can't have been successful, otherwise we'd gotten a confirmation
1595  * record.
1596  */
1597  if (specinsert)
1598  {
1599  ReorderBufferReturnChange(rb, specinsert);
1600  specinsert = NULL;
1601  }
1602 
1603  /* clean up the iterator */
1604  ReorderBufferIterTXNFinish(rb, iterstate);
1605  iterstate = NULL;
1606 
1607  /* call commit callback */
1608  rb->commit(rb, txn, commit_lsn);
1609 
1610  /* this is just a sanity check against bad output plugin behaviour */
1612  elog(ERROR, "output plugin used XID %u",
1614 
1615  /* cleanup */
1616  TeardownHistoricSnapshot(false);
1617 
1618  /*
1619  * Aborting the current (sub-)transaction as a whole has the right
1620  * semantics. We want all locks acquired in here to be released, not
1621  * reassigned to the parent and we do not want any database access
1622  * have persistent effects.
1623  */
1625 
1626  /* make sure there's no cache pollution */
1628 
1629  if (using_subtxn)
1631 
1632  if (snapshot_now->copied)
1633  ReorderBufferFreeSnap(rb, snapshot_now);
1634 
1635  /* remove potential on-disk data, and deallocate */
1636  ReorderBufferCleanupTXN(rb, txn);
1637  }
1638  PG_CATCH();
1639  {
1640  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1641  if (iterstate)
1642  ReorderBufferIterTXNFinish(rb, iterstate);
1643 
1645 
1646  /*
1647  * Force cache invalidation to happen outside of a valid transaction
1648  * to prevent catalog access as we just caught an error.
1649  */
1651 
1652  /* make sure there's no cache pollution */
1654 
1655  if (using_subtxn)
1657 
1658  if (snapshot_now->copied)
1659  ReorderBufferFreeSnap(rb, snapshot_now);
1660 
1661  /* remove potential on-disk data, and deallocate */
1662  ReorderBufferCleanupTXN(rb, txn);
1663 
1664  PG_RE_THROW();
1665  }
1666  PG_END_TRY();
1667 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
uint32 CommandId
Definition: c.h:411
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
TimestampTz commit_time
void AbortCurrentTransaction(void)
Definition: xact.c:2989
bool IsToastRelation(Relation relation)
Definition: catalog.c:136
#define relpathperm(rnode, forknum)
Definition: relpath.h:67
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
ReorderBufferApplyChangeCB apply_change
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
bool copied
Definition: snapshot.h:94
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
union ReorderBufferChange::@93 data
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4346
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2011
ReorderBufferCommitCB commit
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:575
Form_pg_class rd_rel
Definition: rel.h:114
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
XLogRecPtr origin_lsn
#define FirstCommandId
Definition: c.h:413
#define ERROR
Definition: elog.h:43
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:417
struct ReorderBufferChange::@93::@94 tp
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4160
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:434
#define InvalidTransactionId
Definition: transam.h:31
XLogRecPtr final_lsn
void RelationClose(Relation relation)
Definition: relcache.c:2164
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
ReorderBufferMessageCB message
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
#define InvalidCommandId
Definition: c.h:414
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
#define InvalidOid
Definition: postgres_ext.h:36
CommandId curcid
Definition: snapshot.h:96
static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define PG_CATCH()
Definition: elog.h:293
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:676
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
XLogRecPtr end_lsn
void StartTransactionCommand(void)
Definition: xact.c:2680
void BeginInternalSubTransaction(char *name)
Definition: xact.c:4056
#define PG_RE_THROW()
Definition: elog.h:314
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:1995
struct ReorderBufferChange::@93::@95 msg
#define elog
Definition: elog.h:219
ReorderBufferBeginCB begin
#define PG_TRY()
Definition: elog.h:284
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
#define RELKIND_SEQUENCE
Definition: pg_class.h:162
static ReorderBufferIterTXNState * ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2075
#define PG_END_TRY()
Definition: elog.h:300
void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 744 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_delete(), dlist_push_tail(), elog, ReorderBufferTXN::end_lsn, ERROR, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, NULL, ReorderBufferTXNByXid(), and ReorderBufferTXN::subtxns.

Referenced by DecodeCommit().

747 {
748  ReorderBufferTXN *txn;
749  ReorderBufferTXN *subtxn;
750 
751  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
752  InvalidXLogRecPtr, false);
753 
754  /*
755  * No need to do anything if that subtxn didn't contain any changes
756  */
757  if (!subtxn)
758  return;
759 
760  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true);
761 
762  if (txn == NULL)
763  elog(ERROR, "subxact logged without previous toplevel record");
764 
765  /*
766  * Pass our base snapshot to the parent transaction if it doesn't have
767  * one, or ours is older. That can happen if there are no changes in the
768  * toplevel transaction but in one of the child transactions. This allows
769  * the parent to simply use its base snapshot initially.
770  */
771  if (subtxn->base_snapshot != NULL &&
772  (txn->base_snapshot == NULL ||
773  txn->base_snapshot_lsn > subtxn->base_snapshot_lsn))
774  {
775  txn->base_snapshot = subtxn->base_snapshot;
776  txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
777  subtxn->base_snapshot = NULL;
779  }
780 
781  subtxn->final_lsn = commit_lsn;
782  subtxn->end_lsn = end_lsn;
783 
784  if (!subtxn->is_known_as_subxact)
785  {
786  subtxn->is_known_as_subxact = true;
787  Assert(subtxn->nsubtxns == 0);
788 
789  /* remove from lsn order list of top-level transactions */
790  dlist_delete(&subtxn->node);
791 
792  /* add to subtransaction list */
793  dlist_push_tail(&txn->subtxns, &subtxn->node);
794  txn->nsubtxns++;
795  }
796 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
XLogRecPtr base_snapshot_lsn
#define ERROR
Definition: elog.h:43
XLogRecPtr final_lsn
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:676
XLogRecPtr end_lsn
dlist_head subtxns
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define elog
Definition: elog.h:219
static Snapshot ReorderBufferCopySnap ( ReorderBuffer rb,
Snapshot  orig_snap,
ReorderBufferTXN txn,
CommandId  cid 
)
static

Definition at line 1230 of file reorderbuffer.c.

References SnapshotData::active_count, ReorderBuffer::context, SnapshotData::copied, dlist_iter::cur, SnapshotData::curcid, dlist_container, dlist_foreach, i, MemoryContextAllocZero(), ReorderBufferTXN::nsubtxns, qsort, SnapshotData::regd_count, ReorderBufferTXN::subtxns, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::xcnt, ReorderBufferTXN::xid, xidComparator(), and SnapshotData::xip.

Referenced by ReorderBufferCommit().

1232 {
1233  Snapshot snap;
1234  dlist_iter iter;
1235  int i = 0;
1236  Size size;
1237 
1238  size = sizeof(SnapshotData) +
1239  sizeof(TransactionId) * orig_snap->xcnt +
1240  sizeof(TransactionId) * (txn->nsubtxns + 1);
1241 
1242  snap = MemoryContextAllocZero(rb->context, size);
1243  memcpy(snap, orig_snap, sizeof(SnapshotData));
1244 
1245  snap->copied = true;
1246  snap->active_count = 1; /* mark as active so nobody frees it */
1247  snap->regd_count = 0;
1248  snap->xip = (TransactionId *) (snap + 1);
1249 
1250  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1251 
1252  /*
1253  * snap->subxip contains all txids that belong to our transaction which we
1254  * need to check via cmin/cmax. That's why we store the toplevel
1255  * transaction in there as well.
1256  */
1257  snap->subxip = snap->xip + snap->xcnt;
1258  snap->subxip[i++] = txn->xid;
1259 
1260  /*
1261  * nsubxcnt isn't decreased when subtransactions abort, so count manually.
1262  * Since it's an upper boundary it is safe to use it for the allocation
1263  * above.
1264  */
1265  snap->subxcnt = 1;
1266 
1267  dlist_foreach(iter, &txn->subtxns)
1268  {
1269  ReorderBufferTXN *sub_txn;
1270 
1271  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1272  snap->subxip[i++] = sub_txn->xid;
1273  snap->subxcnt++;
1274  }
1275 
1276  /* sort so we can bsearch() later */
1277  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1278 
1279  /* store the specified current CommandId */
1280  snap->curcid = cid;
1281 
1282  return snap;
1283 }
uint32 TransactionId
Definition: c.h:397
bool copied
Definition: snapshot.h:94
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
uint32 regd_count
Definition: snapshot.h:108
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
struct SnapshotData SnapshotData
TransactionId * xip
Definition: snapshot.h:77
MemoryContext context
TransactionId xid
dlist_node * cur
Definition: ilist.h:161
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:742
CommandId curcid
Definition: snapshot.h:96
size_t Size
Definition: c.h:356
dlist_head subtxns
uint32 xcnt
Definition: snapshot.h:78
int i
#define qsort(a, b, c, d)
Definition: port.h:443
TransactionId * subxip
Definition: snapshot.h:89
uint32 active_count
Definition: snapshot.h:107
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:138
int32 subxcnt
Definition: snapshot.h:90
static void ReorderBufferExecuteInvalidations ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1946 of file reorderbuffer.c.

References i, ReorderBufferTXN::invalidations, LocalExecuteInvalidationMessage(), and ReorderBufferTXN::ninvalidations.

Referenced by ReorderBufferCommit().

1947 {
1948  int i;
1949 
1950  for (i = 0; i < txn->ninvalidations; i++)
1952 }
SharedInvalidationMessage * invalidations
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:554
int i
void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1751 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, NULL, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

1752 {
1753  ReorderBufferTXN *txn;
1754 
1755  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1756  false);
1757 
1758  /* unknown, nothing to forget */
1759  if (txn == NULL)
1760  return;
1761 
1762  /* cosmetic... */
1763  txn->final_lsn = lsn;
1764 
1765  /*
1766  * Process cache invalidation messages if there are any. Even if we're not
1767  * interested in the transaction's contents, it could have manipulated the
1768  * catalog and we need to update the caches according to that.
1769  */
1770  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
1772  txn->invalidations);
1773  else
1774  Assert(txn->ninvalidations == 0);
1775 
1776  /* remove potential on-disk data, and deallocate */
1777  ReorderBufferCleanupTXN(rb, txn);
1778 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
XLogRecPtr final_lsn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:676
SharedInvalidationMessage * invalidations
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferFree ( ReorderBuffer rb)

Definition at line 278 of file reorderbuffer.c.

References ReorderBuffer::context, and MemoryContextDelete().

Referenced by FreeDecodingContext().

279 {
280  MemoryContext context = rb->context;
281 
282  /*
283  * We free separately allocated data by entirely scrapping reorderbuffer's
284  * memory context.
285  */
286  MemoryContextDelete(context);
287 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
MemoryContext context
static void ReorderBufferFreeSnap ( ReorderBuffer rb,
Snapshot  snap 
)
static

Definition at line 1289 of file reorderbuffer.c.

References SnapshotData::copied, pfree(), and SnapBuildSnapDecRefcount().

Referenced by ReorderBufferCommit(), and ReorderBufferReturnChange().

1290 {
1291  if (snap->copied)
1292  pfree(snap);
1293  else
1295 }
bool copied
Definition: snapshot.h:94
void pfree(void *pointer)
Definition: mcxt.c:950
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:434
ReorderBufferChange* ReorderBufferGetChange ( ReorderBuffer rb)

Definition at line 346 of file reorderbuffer.c.

References ReorderBuffer::change_context, and MemoryContextAlloc().

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddNewTupleCids(), ReorderBufferAddSnapshot(), ReorderBufferQueueMessage(), and ReorderBufferRestoreChange().

347 {
348  ReorderBufferChange *change;
349 
350  change = (ReorderBufferChange *)
352 
353  memset(change, 0, sizeof(ReorderBufferChange));
354  return change;
355 }
MemoryContext change_context
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:707
ReorderBufferTXN* ReorderBufferGetOldestTXN ( ReorderBuffer rb)

Definition at line 676 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, ReorderBufferTXN::is_known_as_subxact, NULL, and ReorderBuffer::toplevel_by_lsn.

Referenced by SnapBuildProcessRunningXacts().

677 {
678  ReorderBufferTXN *txn;
679 
681  return NULL;
682 
683  AssertTXNLsnOrder(rb);
684 
686 
689  return txn;
690 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
dlist_head toplevel_by_lsn
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:676
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
ReorderBufferTupleBuf* ReorderBufferGetTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)

Definition at line 415 of file reorderbuffer.c.

References ReorderBufferTupleBuf::alloc_tuple_size, Assert, ReorderBuffer::cached_tuplebufs, ReorderBuffer::context, MaxHeapTupleSize, MemoryContextAlloc(), ReorderBuffer::nr_cached_tuplebufs, ReorderBufferTupleBufData, SizeofHeapTupleHeader, slist_container, slist_pop_head_node(), HeapTupleData::t_data, ReorderBufferTupleBuf::tuple, and VALGRIND_MAKE_MEM_UNDEFINED.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

416 {
417  ReorderBufferTupleBuf *tuple;
418  Size alloc_len;
419 
420  alloc_len = tuple_len + SizeofHeapTupleHeader;
421 
422  /*
423  * Most tuples are below MaxHeapTupleSize, so we use a slab allocator for
424  * those. Thus always allocate at least MaxHeapTupleSize. Note that tuples
425  * generated for oldtuples can be bigger, as they don't have out-of-line
426  * toast columns.
427  */
428  if (alloc_len < MaxHeapTupleSize)
429  alloc_len = MaxHeapTupleSize;
430 
431 
432  /* if small enough, check the slab cache */
433  if (alloc_len <= MaxHeapTupleSize && rb->nr_cached_tuplebufs)
434  {
435  rb->nr_cached_tuplebufs--;
439 #ifdef USE_ASSERT_CHECKING
440  memset(&tuple->tuple, 0xa9, sizeof(HeapTupleData));
442 #endif
443  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
444 #ifdef USE_ASSERT_CHECKING
445  memset(tuple->tuple.t_data, 0xa8, tuple->alloc_tuple_size);
447 #endif
448  }
449  else
450  {
451  tuple = (ReorderBufferTupleBuf *)
453  sizeof(ReorderBufferTupleBuf) +
454  MAXIMUM_ALIGNOF + alloc_len);
455  tuple->alloc_tuple_size = alloc_len;
456  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
457  }
458 
459  return tuple;
460 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:170
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
Definition: memdebug.h:28
HeapTupleHeader t_data
Definition: htup.h:67
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:36
#define MaxHeapTupleSize
Definition: htup_details.h:561
HeapTupleData tuple
Definition: reorderbuffer.h:27
static slist_node * slist_pop_head_node(slist_head *head)
Definition: ilist.h:596
MemoryContext context
#define slist_container(type, membername, ptr)
Definition: ilist.h:674
#define Assert(condition)
Definition: c.h:676
size_t Size
Definition: c.h:356
Size nr_cached_tuplebufs
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:707
slist_head cached_tuplebufs
static ReorderBufferTXN * ReorderBufferGetTXN ( ReorderBuffer rb)
static

Definition at line 293 of file reorderbuffer.c.

References ReorderBufferTXN::changes, dlist_init(), MemoryContextAlloc(), ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, and ReorderBuffer::txn_context.

Referenced by ReorderBufferTXNByXid().

294 {
295  ReorderBufferTXN *txn;
296 
297  txn = (ReorderBufferTXN *)
299 
300  memset(txn, 0, sizeof(ReorderBufferTXN));
301 
302  dlist_init(&txn->changes);
303  dlist_init(&txn->tuplecids);
304  dlist_init(&txn->subtxns);
305 
306  return txn;
307 }
dlist_head changes
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
dlist_head subtxns
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:707
MemoryContext txn_context
dlist_head tuplecids
void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 1787 of file reorderbuffer.c.

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by DecodeStandbyOp(), and ReorderBufferForget().

1789 {
1790  bool use_subtxn = IsTransactionOrTransactionBlock();
1791  int i;
1792 
1793  if (use_subtxn)
1794  BeginInternalSubTransaction("replay");
1795 
1796  /*
1797  * Force invalidations to happen outside of a valid transaction - that way
1798  * entries will just be marked as invalid without accessing the catalog.
1799  * That's advantageous because we don't need to setup the full state
1800  * necessary for catalog access.
1801  */
1802  if (use_subtxn)
1804 
1805  for (i = 0; i < ninvalidations; i++)
1806  LocalExecuteInvalidationMessage(&invalidations[i]);
1807 
1808  if (use_subtxn)
1810 }
void AbortCurrentTransaction(void)
Definition: xact.c:2989
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4346
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4160
void BeginInternalSubTransaction(char *name)
Definition: xact.c:4056
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:554
int i
static int ReorderBufferIterCompare ( Datum  a,
Datum  b,
void *  arg 
)
static

Definition at line 815 of file reorderbuffer.c.

References DatumGetInt32, ReorderBufferIterTXNState::entries, and ReorderBufferIterTXNEntry::lsn.

Referenced by ReorderBufferIterTXNInit().

816 {
818  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
819  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
820 
821  if (pos_a < pos_b)
822  return 1;
823  else if (pos_a == pos_b)
824  return 0;
825  return -1;
826 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
#define DatumGetInt32(X)
Definition: postgres.h:478
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:298
void * arg
static void ReorderBufferIterTXNFinish ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1042 of file reorderbuffer.c.

References Assert, binaryheap_free(), CloseTransientFile(), dlist_container, dlist_is_empty(), dlist_pop_head_node(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::fd, ReorderBufferIterTXNState::heap, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, pfree(), and ReorderBufferReturnChange().

Referenced by ReorderBufferCommit().

1044 {
1045  int32 off;
1046 
1047  for (off = 0; off < state->nr_txns; off++)
1048  {
1049  if (state->entries[off].fd != -1)
1050  CloseTransientFile(state->entries[off].fd);
1051  }
1052 
1053  /* free memory we might have "leaked" in the last *Next call */
1054  if (!dlist_is_empty(&state->old_change))
1055  {
1056  ReorderBufferChange *change;
1057 
1058  change = dlist_container(ReorderBufferChange, node,
1059  dlist_pop_head_node(&state->old_change));
1060  ReorderBufferReturnChange(rb, change);
1061  Assert(dlist_is_empty(&state->old_change));
1062  }
1063 
1064  binaryheap_free(state->heap);
1065  pfree(state);
1066 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
signed int int32
Definition: c.h:256
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:950
int CloseTransientFile(int fd)
Definition: fd.c:2305
#define Assert(condition)
Definition: c.h:676
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:69
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
static ReorderBufferIterTXNState * ReorderBufferIterTXNInit ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 833 of file reorderbuffer.c.

References binaryheap_add_unordered(), binaryheap_allocate(), binaryheap_build(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, ReorderBuffer::context, dlist_iter::cur, dlist_container, dlist_foreach, dlist_head_element, dlist_init(), ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::fd, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, MemoryContextAllocZero(), ReorderBufferTXN::nentries, ReorderBufferIterTXNState::nr_txns, ReorderBufferIterTXNState::old_change, ReorderBufferIterCompare(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferIterTXNEntry::segno, ReorderBufferTXN::serialized, ReorderBufferTXN::subtxns, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferCommit().

834 {
835  Size nr_txns = 0;
837  dlist_iter cur_txn_i;
838  int32 off;
839 
840  /*
841  * Calculate the size of our heap: one element for every transaction that
842  * contains changes. (Besides the transactions already in the reorder
843  * buffer, we count the one we were directly passed.)
844  */
845  if (txn->nentries > 0)
846  nr_txns++;
847 
848  dlist_foreach(cur_txn_i, &txn->subtxns)
849  {
850  ReorderBufferTXN *cur_txn;
851 
852  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
853 
854  if (cur_txn->nentries > 0)
855  nr_txns++;
856  }
857 
858  /*
859  * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no
860  * need to allocate/build a heap then.
861  */
862 
863  /* allocate iteration state */
864  state = (ReorderBufferIterTXNState *)
866  sizeof(ReorderBufferIterTXNState) +
867  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
868 
869  state->nr_txns = nr_txns;
870  dlist_init(&state->old_change);
871 
872  for (off = 0; off < state->nr_txns; off++)
873  {
874  state->entries[off].fd = -1;
875  state->entries[off].segno = 0;
876  }
877 
878  /* allocate heap */
879  state->heap = binaryheap_allocate(state->nr_txns,
881  state);
882 
883  /*
884  * Now insert items into the binary heap, in an unordered fashion. (We
885  * will run a heap assembly step at the end; this is more efficient.)
886  */
887 
888  off = 0;
889 
890  /* add toplevel transaction if it contains changes */
891  if (txn->nentries > 0)
892  {
893  ReorderBufferChange *cur_change;
894 
895  if (txn->serialized)
896  {
897  /* serialize remaining changes */
898  ReorderBufferSerializeTXN(rb, txn);
899  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
900  &state->entries[off].segno);
901  }
902 
903  cur_change = dlist_head_element(ReorderBufferChange, node,
904  &txn->changes);
905 
906  state->entries[off].lsn = cur_change->lsn;
907  state->entries[off].change = cur_change;
908  state->entries[off].txn = txn;
909 
911  }
912 
913  /* add subtransactions if they contain changes */
914  dlist_foreach(cur_txn_i, &txn->subtxns)
915  {
916  ReorderBufferTXN *cur_txn;
917 
918  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
919 
920  if (cur_txn->nentries > 0)
921  {
922  ReorderBufferChange *cur_change;
923 
924  if (cur_txn->serialized)
925  {
926  /* serialize remaining changes */
927  ReorderBufferSerializeTXN(rb, cur_txn);
928  ReorderBufferRestoreChanges(rb, cur_txn,
929  &state->entries[off].fd,
930  &state->entries[off].segno);
931  }
932  cur_change = dlist_head_element(ReorderBufferChange, node,
933  &cur_txn->changes);
934 
935  state->entries[off].lsn = cur_change->lsn;
936  state->entries[off].change = cur_change;
937  state->entries[off].txn = cur_txn;
938 
940  }
941  }
942 
943  /* assemble a valid binary heap */
944  binaryheap_build(state->heap);
945 
946  return state;
947 }
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
ReorderBufferTXN * txn
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, int *fd, XLogSegNo *segno)
void binaryheap_add_unordered(binaryheap *heap, Datum d)
Definition: binaryheap.c:110
signed int int32
Definition: c.h:256
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
ReorderBufferChange * change
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
MemoryContext context
dlist_node * cur
Definition: ilist.h:161
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:742
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:126
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
Definition: regguts.h:298
size_t Size
Definition: c.h:356
dlist_head subtxns
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:33
#define Int32GetDatum(X)
Definition: postgres.h:485
static ReorderBufferChange * ReorderBufferIterTXNNext ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 956 of file reorderbuffer.c.

References Assert, binaryheap::bh_size, binaryheap_first(), binaryheap_remove_first(), binaryheap_replace_first(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, DatumGetInt32, DEBUG2, dlist_container, dlist_delete(), dlist_has_next(), dlist_head_element, dlist_is_empty(), dlist_next_node(), dlist_pop_head_node(), dlist_push_tail(), elog, ReorderBufferIterTXNState::entries, ReorderBufferIterTXNEntry::fd, ReorderBufferIterTXNState::heap, Int32GetDatum, ReorderBufferChange::lsn, ReorderBufferIterTXNEntry::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, next, ReorderBufferChange::node, NULL, ReorderBufferIterTXNState::old_change, ReorderBufferRestoreChanges(), ReorderBufferReturnChange(), ReorderBufferIterTXNEntry::segno, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferCommit().

957 {
958  ReorderBufferChange *change;
960  int32 off;
961 
962  /* nothing there anymore */
963  if (state->heap->bh_size == 0)
964  return NULL;
965 
966  off = DatumGetInt32(binaryheap_first(state->heap));
967  entry = &state->entries[off];
968 
969  /* free memory we might have "leaked" in the previous *Next call */
970  if (!dlist_is_empty(&state->old_change))
971  {
972  change = dlist_container(ReorderBufferChange, node,
974  ReorderBufferReturnChange(rb, change);
975  Assert(dlist_is_empty(&state->old_change));
976  }
977 
978  change = entry->change;
979 
980  /*
981  * update heap with information about which transaction has the next
982  * relevant change in LSN order
983  */
984 
985  /* there are in-memory changes */
986  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
987  {
988  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
989  ReorderBufferChange *next_change =
991 
992  /* txn stays the same */
993  state->entries[off].lsn = next_change->lsn;
994  state->entries[off].change = next_change;
995 
997  return change;
998  }
999 
1000  /* try to load changes from disk */
1001  if (entry->txn->nentries != entry->txn->nentries_mem)
1002  {
1003  /*
1004  * Ugly: restoring changes will reuse *Change records, thus delete the
1005  * current one from the per-tx list and only free in the next call.
1006  */
1007  dlist_delete(&change->node);
1008  dlist_push_tail(&state->old_change, &change->node);
1009 
1010  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
1011  &state->entries[off].segno))
1012  {
1013  /* successfully restored changes from disk */
1014  ReorderBufferChange *next_change =
1016  &entry->txn->changes);
1017 
1018  elog(DEBUG2, "restored %u/%u changes from disk",
1019  (uint32) entry->txn->nentries_mem,
1020  (uint32) entry->txn->nentries);
1021 
1022  Assert(entry->txn->nentries_mem);
1023  /* txn stays the same */
1024  state->entries[off].lsn = next_change->lsn;
1025  state->entries[off].change = next_change;
1027 
1028  return change;
1029  }
1030  }
1031 
1032  /* ok, no changes there anymore, remove */
1033  binaryheap_remove_first(state->heap);
1034 
1035  return change;
1036 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
static int32 next
Definition: blutils.c:210
#define DatumGetInt32(X)
Definition: postgres.h:478
ReorderBufferTXN * txn
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void binaryheap_replace_first(binaryheap *heap, Datum d)
Definition: binaryheap.c:204
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, int *fd, XLogSegNo *segno)
signed int int32
Definition: c.h:256
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:421
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
dlist_head changes
Datum binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:159
#define DEBUG2
Definition: elog.h:24
unsigned int uint32
Definition: c.h:268
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
int bh_size
Definition: binaryheap.h:32
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
ReorderBufferChange * change
static bool dlist_has_next(dlist_head *head, dlist_node *node)
Definition: ilist.h:402
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:676
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
#define Int32GetDatum(X)
Definition: postgres.h:485
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
Datum binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:174
#define elog
Definition: elog.h:219
void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1823 of file reorderbuffer.c.

References InvalidTransactionId, NULL, and ReorderBufferTXNByXid().

Referenced by DecodeHeap2Op(), DecodeHeapOp(), DecodeLogicalMsgOp(), DecodeStandbyOp(), DecodeXactOp(), DecodeXLogOp(), and LogicalDecodingProcessRecord().

1824 {
1825  /* many records won't have an xid assigned, centralize check here */
1826  if (xid != InvalidTransactionId)
1827  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1828 }
#define InvalidTransactionId
Definition: transam.h:31
#define NULL
Definition: c.h:229
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change 
)

Definition at line 578 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, NULL, ReorderBufferCheckSerializeTXN(), and ReorderBufferTXNByXid().

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

580 {
581  ReorderBufferTXN *txn;
582 
583  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
584 
585  change->lsn = lsn;
586  Assert(InvalidXLogRecPtr != lsn);
587  dlist_push_tail(&txn->changes, &change->node);
588  txn->nentries++;
589  txn->nentries_mem++;
590 
592 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
dlist_head changes
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:676
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snapshot,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 598 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBuffer::message, ReorderBufferChange::msg, NULL, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), and TeardownHistoricSnapshot().

Referenced by DecodeLogicalMsgOp().

602 {
603  if (transactional)
604  {
605  MemoryContext oldcontext;
606  ReorderBufferChange *change;
607 
609 
610  oldcontext = MemoryContextSwitchTo(rb->context);
611 
612  change = ReorderBufferGetChange(rb);
614  change->data.msg.prefix = pstrdup(prefix);
615  change->data.msg.message_size = message_size;
616  change->data.msg.message = palloc(message_size);
617  memcpy(change->data.msg.message, message, message_size);
618 
619  ReorderBufferQueueChange(rb, xid, lsn, change);
620 
621  MemoryContextSwitchTo(oldcontext);
622  }
623  else
624  {
625  ReorderBufferTXN *txn = NULL;
626  volatile Snapshot snapshot_now = snapshot;
627 
628  if (xid != InvalidTransactionId)
629  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
630 
631  /* setup snapshot to allow catalog access */
632  SetupHistoricSnapshot(snapshot_now, NULL);
633  PG_TRY();
634  {
635  rb->message(rb, txn, lsn, false, prefix, message_size, message);
636 
638  }
639  PG_CATCH();
640  {
642  PG_RE_THROW();
643  }
644  PG_END_TRY();
645  }
646 }
char * pstrdup(const char *in)
Definition: mcxt.c:1077
union ReorderBufferChange::@93 data
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2011
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
#define InvalidTransactionId
Definition: transam.h:31
ReorderBufferMessageCB message
MemoryContext context
#define PG_CATCH()
Definition: elog.h:293
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:676
#define PG_RE_THROW()
Definition: elog.h:314
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void * palloc(Size size)
Definition: mcxt.c:849
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:1995
struct ReorderBufferChange::@93::@95 msg
#define PG_TRY()
Definition: elog.h:284
#define PG_END_TRY()
Definition: elog.h:300
static void ReorderBufferRestoreChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
char *  change 
)
static

Definition at line 2437 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, ReorderBufferTXN::changes, ReorderBuffer::context, SnapshotData::copied, ReorderBufferChange::data, dlist_push_tail(), MemoryContextAlloc(), MemoryContextAllocZero(), ReorderBufferChange::msg, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, offsetof, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferGetChange(), ReorderBufferGetTupleBuf(), ReorderBufferTupleBufData, SizeofHeapTupleHeader, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, ReorderBufferChange::tp, SnapshotData::xcnt, and SnapshotData::xip.

Referenced by ReorderBufferRestoreChanges().

2439 {
2440  ReorderBufferDiskChange *ondisk;
2441  ReorderBufferChange *change;
2442 
2443  ondisk = (ReorderBufferDiskChange *) data;
2444 
2445  change = ReorderBufferGetChange(rb);
2446 
2447  /* copy static part */
2448  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
2449 
2450  data += sizeof(ReorderBufferDiskChange);
2451 
2452  /* restore individual stuff */
2453  switch (change->action)
2454  {
2455  /* fall through these, they're all similar enough */
2460  if (change->data.tp.oldtuple)
2461  {
2462  uint32 tuplelen = ((HeapTuple) data)->t_len;
2463 
2464  change->data.tp.oldtuple =
2466 
2467  /* restore ->tuple */
2468  memcpy(&change->data.tp.oldtuple->tuple, data,
2469  sizeof(HeapTupleData));
2470  data += sizeof(HeapTupleData);
2471 
2472  /* reset t_data pointer into the new tuplebuf */
2473  change->data.tp.oldtuple->tuple.t_data =
2474  ReorderBufferTupleBufData(change->data.tp.oldtuple);
2475 
2476  /* restore tuple data itself */
2477  memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
2478  data += tuplelen;
2479  }
2480 
2481  if (change->data.tp.newtuple)
2482  {
2483  /* here, data might not be suitably aligned! */
2484  uint32 tuplelen;
2485 
2486  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
2487  sizeof(uint32));
2488 
2489  change->data.tp.newtuple =
2491 
2492  /* restore ->tuple */
2493  memcpy(&change->data.tp.newtuple->tuple, data,
2494  sizeof(HeapTupleData));
2495  data += sizeof(HeapTupleData);
2496 
2497  /* reset t_data pointer into the new tuplebuf */
2498  change->data.tp.newtuple->tuple.t_data =
2499  ReorderBufferTupleBufData(change->data.tp.newtuple);
2500 
2501  /* restore tuple data itself */
2502  memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
2503  data += tuplelen;
2504  }
2505 
2506  break;
2508  {
2509  Size prefix_size;
2510 
2511  /* read prefix */
2512  memcpy(&prefix_size, data, sizeof(Size));
2513  data += sizeof(Size);
2514  change->data.msg.prefix = MemoryContextAlloc(rb->context,
2515  prefix_size);
2516  memcpy(change->data.msg.prefix, data, prefix_size);
2517  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
2518  data += prefix_size;
2519 
2520  /* read the message */
2521  memcpy(&change->data.msg.message_size, data, sizeof(Size));
2522  data += sizeof(Size);
2523  change->data.msg.message = MemoryContextAlloc(rb->context,
2524  change->data.msg.message_size);
2525  memcpy(change->data.msg.message, data,
2526  change->data.msg.message_size);
2527  data += change->data.msg.message_size;
2528 
2529  break;
2530  }
2532  {
2533  Snapshot oldsnap;
2534  Snapshot newsnap;
2535  Size size;
2536 
2537  oldsnap = (Snapshot) data;
2538 
2539  size = sizeof(SnapshotData) +
2540  sizeof(TransactionId) * oldsnap->xcnt +
2541  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
2542 
2543  change->data.snapshot = MemoryContextAllocZero(rb->context, size);
2544 
2545  newsnap = change->data.snapshot;
2546 
2547  memcpy(newsnap, data, size);
2548  newsnap->xip = (TransactionId *)
2549  (((char *) newsnap) + sizeof(SnapshotData));
2550  newsnap->subxip = newsnap->xip + newsnap->xcnt;
2551  newsnap->copied = true;
2552  break;
2553  }
2554  /* the base struct contains all the data, easy peasy */
2558  break;
2559  }
2560 
2561  dlist_push_tail(&txn->changes, &change->node);
2562  txn->nentries_mem++;
2563 }
#define SizeofHeapTupleHeader
Definition: htup_details.h:170
HeapTupleData * HeapTuple
Definition: htup.h:70
uint32 TransactionId
Definition: c.h:397
bool copied
Definition: snapshot.h:94
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
union ReorderBufferChange::@93 data
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
struct SnapshotData * Snapshot
Definition: snapshot.h:23
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:36
dlist_head changes
struct ReorderBufferChange::@93::@94 tp
struct SnapshotData SnapshotData
unsigned int uint32
Definition: c.h:268
TransactionId * xip
Definition: snapshot.h:77
MemoryContext context
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:742
struct ReorderBufferDiskChange ReorderBufferDiskChange
#define Assert(condition)
Definition: c.h:676
size_t Size
Definition: c.h:356
uint32 xcnt
Definition: snapshot.h:78
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:707
struct ReorderBufferChange::@93::@95 msg
ReorderBufferChange change
struct HeapTupleData HeapTupleData
#define offsetof(type, field)
Definition: c.h:555
TransactionId * subxip
Definition: snapshot.h:89
int32 subxcnt
Definition: snapshot.h:90
ReorderBufferTupleBuf * ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
static Size ReorderBufferRestoreChanges ( ReorderBuffer rb,
ReorderBufferTXN txn,
int *  fd,
XLogSegNo segno 
)
static

Definition at line 2300 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, cleanup(), CloseTransientFile(), dlist_mutable_iter::cur, ReplicationSlot::data, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, max_changes_in_memory, MAXPGPATH, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, OpenTransientFile(), ReorderBuffer::outbuf, PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, WAIT_EVENT_REORDER_BUFFER_READ, ReorderBufferTXN::xid, XLByteToSeg, and XLogSegNoOffsetToRecPtr.

Referenced by ReorderBufferIterTXNInit(), and ReorderBufferIterTXNNext().

2302 {
2303  Size restored = 0;
2304  XLogSegNo last_segno;
2305  dlist_mutable_iter cleanup_iter;
2306 
2309 
2310  /* free current entries, so we have memory for more */
2311  dlist_foreach_modify(cleanup_iter, &txn->changes)
2312  {
2314  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
2315 
2316  dlist_delete(&cleanup->node);
2317  ReorderBufferReturnChange(rb, cleanup);
2318  }
2319  txn->nentries_mem = 0;
2320  Assert(dlist_is_empty(&txn->changes));
2321 
2322  XLByteToSeg(txn->final_lsn, last_segno);
2323 
2324  while (restored < max_changes_in_memory && *segno <= last_segno)
2325  {
2326  int readBytes;
2327  ReorderBufferDiskChange *ondisk;
2328 
2329  if (*fd == -1)
2330  {
2331  XLogRecPtr recptr;
2332  char path[MAXPGPATH];
2333 
2334  /* first time in */
2335  if (*segno == 0)
2336  {
2337  XLByteToSeg(txn->first_lsn, *segno);
2338  }
2339 
2340  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2341  XLogSegNoOffsetToRecPtr(*segno, 0, recptr);
2342 
2343  /*
2344  * No need to care about TLIs here, only used during a single run,
2345  * so each LSN only maps to a specific WAL record.
2346  */
2347  sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2349  (uint32) (recptr >> 32), (uint32) recptr);
2350 
2351  *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
2352  if (*fd < 0 && errno == ENOENT)
2353  {
2354  *fd = -1;
2355  (*segno)++;
2356  continue;
2357  }
2358  else if (*fd < 0)
2359  ereport(ERROR,
2361  errmsg("could not open file \"%s\": %m",
2362  path)));
2363 
2364  }
2365 
2366  /*
2367  * Read the statically sized part of a change which has information
2368  * about the total size. If we couldn't read a record, we're at the
2369  * end of this file.
2370  */
2373  readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
2375 
2376  /* eof */
2377  if (readBytes == 0)
2378  {
2380  *fd = -1;
2381  (*segno)++;
2382  continue;
2383  }
2384  else if (readBytes < 0)
2385  ereport(ERROR,
2387  errmsg("could not read from reorderbuffer spill file: %m")));
2388  else if (readBytes != sizeof(ReorderBufferDiskChange))
2389  ereport(ERROR,
2391  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2392  readBytes,
2393  (uint32) sizeof(ReorderBufferDiskChange))));
2394 
2395  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2396 
2398  sizeof(ReorderBufferDiskChange) + ondisk->size);
2399  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2400 
2402  readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
2403  ondisk->size - sizeof(ReorderBufferDiskChange));
2405 
2406  if (readBytes < 0)
2407  ereport(ERROR,
2409  errmsg("could not read from reorderbuffer spill file: %m")));
2410  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
2411  ereport(ERROR,
2413  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2414  readBytes,
2415  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
2416 
2417  /*
2418  * ok, read a full change from disk, now restore it into proper
2419  * in-memory format
2420  */
2421  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
2422  restored++;
2423  }
2424 
2425  return restored;
2426 }
XLogRecPtr first_lsn
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
ReplicationSlotPersistentData data
Definition: slot.h:116
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1039
#define XLogSegNoOffsetToRecPtr(segno, offset, dest)
Definition: xlog_internal.h:95
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define ERROR
Definition: elog.h:43
dlist_head changes
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:34
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2144
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:268
XLogRecPtr final_lsn
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1235
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define ereport(elevel, rest)
Definition: elog.h:122
int CloseTransientFile(int fd)
Definition: fd.c:2305
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
static void cleanup(void)
Definition: bootstrap.c:860
TransactionId xid
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define XLByteToSeg(xlrp, logSegNo)
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:676
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
size_t Size
Definition: c.h:356
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1211
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define NameStr(name)
Definition: c.h:499
static const Size max_changes_in_memory
#define read(a, b, c)
Definition: win32.h:13
static void ReorderBufferRestoreCleanup ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2569 of file reorderbuffer.c.

References Assert, cur, ReplicationSlot::data, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, MAXPGPATH, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, unlink(), ReorderBufferTXN::xid, XLByteToSeg, and XLogSegNoOffsetToRecPtr.

Referenced by ReorderBufferCleanupTXN().

2570 {
2571  XLogSegNo first;
2572  XLogSegNo cur;
2573  XLogSegNo last;
2574 
2577 
2578  XLByteToSeg(txn->first_lsn, first);
2579  XLByteToSeg(txn->final_lsn, last);
2580 
2581  /* iterate over all possible filenames, and delete them */
2582  for (cur = first; cur <= last; cur++)
2583  {
2584  char path[MAXPGPATH];
2585  XLogRecPtr recptr;
2586 
2587  XLogSegNoOffsetToRecPtr(cur, 0, recptr);
2588 
2589  sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2591  (uint32) (recptr >> 32), (uint32) recptr);
2592  if (unlink(path) != 0 && errno != ENOENT)
2593  ereport(ERROR,
2595  errmsg("could not remove file \"%s\": %m", path)));
2596  }
2597 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
struct cursor * cur
Definition: ecpg.c:28
ReplicationSlotPersistentData data
Definition: slot.h:116
#define XLogSegNoOffsetToRecPtr(segno, offset, dest)
Definition: xlog_internal.h:95
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:34
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:268
XLogRecPtr final_lsn
int unlink(const char *filename)
#define ereport(elevel, rest)
Definition: elog.h:122
TransactionId xid
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define XLByteToSeg(xlrp, logSegNo)
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:676
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define NameStr(name)
Definition: c.h:499
void ReorderBufferReturnChange ( ReorderBuffer rb,
ReorderBufferChange change 
)

Definition at line 364 of file reorderbuffer.c.

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::msg, NULL, pfree(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferFreeSnap(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, and ReorderBufferChange::tp.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferCommit(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), and ReorderBufferToastReset().

365 {
366  /* free contained data */
367  switch (change->action)
368  {
373  if (change->data.tp.newtuple)
374  {
375  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
376  change->data.tp.newtuple = NULL;
377  }
378 
379  if (change->data.tp.oldtuple)
380  {
381  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
382  change->data.tp.oldtuple = NULL;
383  }
384  break;
386  if (change->data.msg.prefix != NULL)
387  pfree(change->data.msg.prefix);
388  change->data.msg.prefix = NULL;
389  if (change->data.msg.message != NULL)
390  pfree(change->data.msg.message);
391  change->data.msg.message = NULL;
392  break;
394  if (change->data.snapshot)
395  {
396  ReorderBufferFreeSnap(rb, change->data.snapshot);
397  change->data.snapshot = NULL;
398  }
399  break;
400  /* no data in addition to the struct itself */
404  break;
405  }
406 
407  pfree(change);
408 }
union ReorderBufferChange::@93 data
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
void pfree(void *pointer)
Definition: mcxt.c:950
struct ReorderBufferChange::@93::@94 tp
#define NULL
Definition: c.h:229
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
struct ReorderBufferChange::@93::@95 msg
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
void ReorderBufferReturnTupleBuf ( ReorderBuffer rb,
ReorderBufferTupleBuf tuple 
)

Definition at line 469 of file reorderbuffer.c.

References ReorderBufferTupleBuf::alloc_tuple_size, ReorderBuffer::cached_tuplebufs, max_cached_tuplebufs, MaxHeapTupleSize, ReorderBufferTupleBuf::node, ReorderBuffer::nr_cached_tuplebufs, pfree(), slist_push_head(), HeapTupleData::t_data, ReorderBufferTupleBuf::tuple, VALGRIND_MAKE_MEM_DEFINED, and VALGRIND_MAKE_MEM_UNDEFINED.

Referenced by ReorderBufferReturnChange().

470 {
471  /* check whether to put into the slab cache, oversized tuples never are */
472  if (tuple->alloc_tuple_size == MaxHeapTupleSize &&
474  {
475  rb->nr_cached_tuplebufs++;
476  slist_push_head(&rb->cached_tuplebufs, &tuple->node);
479  VALGRIND_MAKE_MEM_DEFINED(&tuple->node, sizeof(tuple->node));
481  }
482  else
483  {
484  pfree(tuple);
485  }
486 }
#define VALGRIND_MAKE_MEM_DEFINED(addr, size)
Definition: memdebug.h:26
static const Size max_cached_tuplebufs
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
Definition: memdebug.h:28
static void slist_push_head(slist_head *head, slist_node *node)
Definition: ilist.h:574
HeapTupleHeader t_data
Definition: htup.h:67
void pfree(void *pointer)
Definition: mcxt.c:950
#define MaxHeapTupleSize
Definition: htup_details.h:561
HeapTupleData tuple
Definition: reorderbuffer.h:27
Size nr_cached_tuplebufs
slist_head cached_tuplebufs
static void ReorderBufferReturnTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 316 of file reorderbuffer.c.

References ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, hash_destroy(), ReorderBufferTXN::invalidations, InvalidTransactionId, NULL, pfree(), ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::xid.

Referenced by ReorderBufferCleanupTXN().

317 {
318  /* clean the lookup cache if we were cached (quite likely) */
319  if (rb->by_txn_last_xid == txn->xid)
320  {
322  rb->by_txn_last_txn = NULL;
323  }
324 
325  /* free data that's contained */
326 
327  if (txn->tuplecid_hash != NULL)
328  {
330  txn->tuplecid_hash = NULL;
331  }
332 
333  if (txn->invalidations)
334  {
335  pfree(txn->invalidations);
336  txn->invalidations = NULL;
337  }
338 
339  pfree(txn);
340 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:810
TransactionId by_txn_last_xid
void pfree(void *pointer)
Definition: mcxt.c:950
#define InvalidTransactionId
Definition: transam.h:31
ReorderBufferTXN * by_txn_last_txn
TransactionId xid
#define NULL
Definition: c.h:229
SharedInvalidationMessage * invalidations
static void ReorderBufferSerializeChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  fd,
ReorderBufferChange change 
)
static

Definition at line 2136 of file reorderbuffer.c.

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, CloseTransientFile(), ReorderBufferChange::data, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferChange::msg, ReorderBuffer::outbuf, pgstat_report_wait_end(), pgstat_report_wait_start(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferTupleBuf::tuple, WAIT_EVENT_REORDER_BUFFER_WRITE, write, SnapshotData::xcnt, ReorderBufferTXN::xid, and SnapshotData::xip.

Referenced by ReorderBufferSerializeTXN().

2138 {
2139  ReorderBufferDiskChange *ondisk;
2140  Size sz = sizeof(ReorderBufferDiskChange);
2141 
2143 
2144  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2145  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
2146 
2147  switch (change->action)
2148  {
2149  /* fall through these, they're all similar enough */
2154  {
2155  char *data;
2156  ReorderBufferTupleBuf *oldtup,
2157  *newtup;
2158  Size oldlen = 0;
2159  Size newlen = 0;
2160 
2161  oldtup = change->data.tp.oldtuple;
2162  newtup = change->data.tp.newtuple;
2163 
2164  if (oldtup)
2165  {
2166  sz += sizeof(HeapTupleData);
2167  oldlen = oldtup->tuple.t_len;
2168  sz += oldlen;
2169  }
2170 
2171  if (newtup)
2172  {
2173  sz += sizeof(HeapTupleData);
2174  newlen = newtup->tuple.t_len;
2175  sz += newlen;
2176  }
2177 
2178  /* make sure we have enough space */
2180 
2181  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2182  /* might have been reallocated above */
2183  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2184 
2185  if (oldlen)
2186  {
2187  memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
2188  data += sizeof(HeapTupleData);
2189 
2190  memcpy(data, oldtup->tuple.t_data, oldlen);
2191  data += oldlen;
2192  }
2193 
2194  if (newlen)
2195  {
2196  memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
2197  data += sizeof(HeapTupleData);
2198 
2199  memcpy(data, newtup->tuple.t_data, newlen);
2200  data += newlen;
2201  }
2202  break;
2203  }
2205  {
2206  char *data;
2207  Size prefix_size = strlen(change->data.msg.prefix) + 1;
2208 
2209  sz += prefix_size + change->data.msg.message_size +
2210  sizeof(Size) + sizeof(Size);
2212 
2213  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2214 
2215  /* might have been reallocated above */
2216  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2217 
2218  /* write the prefix including the size */
2219  memcpy(data, &prefix_size, sizeof(Size));
2220  data += sizeof(Size);
2221  memcpy(data, change->data.msg.prefix,
2222  prefix_size);
2223  data += prefix_size;
2224 
2225  /* write the message including the size */
2226  memcpy(data, &change->data.msg.message_size, sizeof(Size));
2227  data += sizeof(Size);
2228  memcpy(data, change->data.msg.message,
2229  change->data.msg.message_size);
2230  data += change->data.msg.message_size;
2231 
2232  break;
2233  }
2235  {
2236  Snapshot snap;
2237  char *data;
2238 
2239  snap = change->data.snapshot;
2240 
2241  sz += sizeof(SnapshotData) +
2242  sizeof(TransactionId) * snap->xcnt +
2243  sizeof(TransactionId) * snap->subxcnt
2244  ;
2245 
2246  /* make sure we have enough space */
2248  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2249  /* might have been reallocated above */
2250  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2251 
2252  memcpy(data, snap, sizeof(SnapshotData));
2253  data += sizeof(SnapshotData);
2254 
2255  if (snap->xcnt)
2256  {
2257  memcpy(data, snap->xip,
2258  sizeof(TransactionId) * snap->xcnt);
2259  data += sizeof(TransactionId) * snap->xcnt;
2260  }
2261 
2262  if (snap->subxcnt)
2263  {
2264  memcpy(data, snap->subxip,
2265  sizeof(TransactionId) * snap->subxcnt);
2266  data += sizeof(TransactionId) * snap->subxcnt;
2267  }
2268  break;
2269  }
2273  /* ReorderBufferChange contains everything important */
2274  break;
2275  }
2276 
2277  ondisk->size = sz;
2278 
2280  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
2281  {
2282  int save_errno = errno;
2283 
2285  errno = save_errno;
2286  ereport(ERROR,
2288  errmsg("could not write to data file for XID %u: %m",
2289  txn->xid)));
2290  }
2292 
2293  Assert(ondisk->change.action == change->action);
2294 }
uint32 TransactionId
Definition: c.h:397
#define write(a, b, c)
Definition: win32.h:14
union ReorderBufferChange::@93 data
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
static int fd(const char *x, int i)
Definition: preproc-init.c:105
HeapTupleHeader t_data
Definition: htup.h:67
#define ERROR
Definition: elog.h:43
struct ReorderBufferChange::@93::@94 tp
uint32 t_len
Definition: htup.h:64
int errcode_for_file_access(void)
Definition: elog.c:598
HeapTupleData tuple
Definition: reorderbuffer.h:27
struct SnapshotData SnapshotData
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1235
#define ereport(elevel, rest)
Definition: elog.h:122
TransactionId * xip
Definition: snapshot.h:77
int CloseTransientFile(int fd)
Definition: fd.c:2305
TransactionId xid
struct ReorderBufferDiskChange ReorderBufferDiskChange
#define Assert(condition)
Definition: c.h:676
size_t Size
Definition: c.h:356
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1211
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
uint32 xcnt
Definition: snapshot.h:78
int errmsg(const char *fmt,...)
Definition: elog.c:797
struct ReorderBufferChange::@93::@95 msg
ReorderBufferChange change
struct HeapTupleData HeapTupleData
TransactionId * subxip
Definition: snapshot.h:89
int32 subxcnt
Definition: snapshot.h:90
static void ReorderBufferSerializeReserve ( ReorderBuffer rb,
Size  sz 
)
static

Definition at line 2019 of file reorderbuffer.c.

References ReorderBuffer::context, MemoryContextAlloc(), ReorderBuffer::outbuf, ReorderBuffer::outbufsize, and repalloc().

Referenced by ReorderBufferRestoreChanges(), and ReorderBufferSerializeChange().

2020 {
2021  if (!rb->outbufsize)
2022  {
2023  rb->outbuf = MemoryContextAlloc(rb->context, sz);
2024  rb->outbufsize = sz;
2025  }
2026  else if (rb->outbufsize < sz)
2027  {
2028  rb->outbuf = repalloc(rb->outbuf, sz);
2029  rb->outbufsize = sz;
2030  }
2031 }
MemoryContext context
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:963
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:707
static void ReorderBufferSerializeTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2054 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::changes, CloseTransientFile(), dlist_iter::cur, dlist_mutable_iter::cur, ReplicationSlot::data, DEBUG2, dlist_container, dlist_delete(), dlist_foreach, dlist_foreach_modify, dlist_is_empty(), elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferChange::lsn, MAXPGPATH, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, OpenTransientFile(), PG_BINARY, ReorderBufferReturnChange(), ReorderBufferSerializeChange(), ReorderBufferTXN::serialized, ReorderBufferTXN::subtxns, ReorderBufferTXN::xid, XLByteInSeg, XLByteToSeg, and XLogSegNoOffsetToRecPtr.

Referenced by ReorderBufferCheckSerializeTXN(), and ReorderBufferIterTXNInit().

2055 {
2056  dlist_iter subtxn_i;
2057  dlist_mutable_iter change_i;
2058  int fd = -1;
2059  XLogSegNo curOpenSegNo = 0;
2060  Size spilled = 0;
2061  char path[MAXPGPATH];
2062 
2063  elog(DEBUG2, "spill %u changes in XID %u to disk",
2064  (uint32) txn->nentries_mem, txn->xid);
2065 
2066  /* do the same to all child TXs */
2067  dlist_foreach(subtxn_i, &txn->subtxns)
2068  {
2069  ReorderBufferTXN *subtxn;
2070 
2071  subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
2072  ReorderBufferSerializeTXN(rb, subtxn);
2073  }
2074 
2075  /* serialize changestream */
2076  dlist_foreach_modify(change_i, &txn->changes)
2077  {
2078  ReorderBufferChange *change;
2079 
2080  change = dlist_container(ReorderBufferChange, node, change_i.cur);
2081 
2082  /*
2083  * store in segment in which it belongs by start lsn, don't split over
2084  * multiple segments tho
2085  */
2086  if (fd == -1 || !XLByteInSeg(change->lsn, curOpenSegNo))
2087  {
2088  XLogRecPtr recptr;
2089 
2090  if (fd != -1)
2091  CloseTransientFile(fd);
2092 
2093  XLByteToSeg(change->lsn, curOpenSegNo);
2094  XLogSegNoOffsetToRecPtr(curOpenSegNo, 0, recptr);
2095 
2096  /*
2097  * No need to care about TLIs here, only used during a single run,
2098  * so each LSN only maps to a specific WAL record.
2099  */
2100  sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2102  (uint32) (recptr >> 32), (uint32) recptr);
2103 
2104  /* open segment, create it if necessary */
2105  fd = OpenTransientFile(path,
2106  O_CREAT | O_WRONLY | O_APPEND | PG_BINARY,
2107  S_IRUSR | S_IWUSR);
2108 
2109  if (fd < 0)
2110  ereport(ERROR,
2112  errmsg("could not open file \"%s\": %m",
2113  path)));
2114  }
2115 
2116  ReorderBufferSerializeChange(rb, txn, fd, change);
2117  dlist_delete(&change->node);
2118  ReorderBufferReturnChange(rb, change);
2119 
2120  spilled++;
2121  }
2122 
2123  Assert(spilled == txn->nentries_mem);
2124  Assert(dlist_is_empty(&txn->changes));
2125  txn->nentries_mem = 0;
2126  txn->serialized = true;
2127 
2128  if (fd != -1)
2129  CloseTransientFile(fd);
2130 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
ReplicationSlotPersistentData data
Definition: slot.h:116
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1039
#define XLogSegNoOffsetToRecPtr(segno, offset, dest)
Definition: xlog_internal.h:95
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define ERROR
Definition: elog.h:43
dlist_head changes
#define MAXPGPATH
#define DEBUG2
Definition: elog.h:24
uint64 XLogSegNo
Definition: xlogdefs.h:34
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2144
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:268
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define ereport(elevel, rest)
Definition: elog.h:122
int CloseTransientFile(int fd)
Definition: fd.c:2305
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId xid
dlist_node * cur
Definition: ilist.h:161
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define XLByteToSeg(xlrp, logSegNo)
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:676
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
size_t Size
Definition: c.h:356
dlist_head subtxns
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define NameStr(name)
Definition: c.h:499
#define elog
Definition: elog.h:219
#define XLByteInSeg(xlrp, logSegNo)
void ReorderBufferSetBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 1856 of file reorderbuffer.c.

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, NULL, and ReorderBufferTXNByXid().

Referenced by SnapBuildCommitTxn(), and SnapBuildProcessChange().

1858 {
1859  ReorderBufferTXN *txn;
1860  bool is_new;
1861 
1862  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
1863  Assert(txn->base_snapshot == NULL);
1864  Assert(snap != NULL);
1865 
1866  txn->base_snapshot = snap;
1867  txn->base_snapshot_lsn = lsn;
1868 }
Snapshot base_snapshot
XLogRecPtr base_snapshot_lsn
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:676
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferSetRestartPoint ( ReorderBuffer rb,
XLogRecPtr  ptr 
)

Definition at line 693 of file reorderbuffer.c.

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

694 {
696 }
XLogRecPtr current_restart_decoding_lsn
static void ReorderBufferToastAppendChunk ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 2691 of file reorderbuffer.c.

References Assert, ReorderBufferToastEnt::chunk_id, ReorderBufferToastEnt::chunks, ReorderBufferChange::data, DatumGetInt32, DatumGetObjectId, DatumGetPointer, dlist_init(), dlist_push_tail(), elog, ERROR, fastgetattr, HASH_ENTER, hash_search(), IsToastRelation(), ReorderBufferToastEnt::last_chunk_seq, ReorderBufferChange::node, NULL, ReorderBufferToastEnt::num_chunks, ReorderBufferToastEnt::reconstructed, RelationGetDescr, ReorderBufferToastInitHash(), ReorderBufferToastEnt::size, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, ReorderBufferTupleBuf::tuple, VARATT_IS_EXTENDED, VARATT_IS_SHORT, VARHDRSZ, VARHDRSZ_SHORT, VARSIZE, and VARSIZE_SHORT.

Referenced by ReorderBufferCommit().

2693 {
2694  ReorderBufferToastEnt *ent;
2695  ReorderBufferTupleBuf *newtup;
2696  bool found;
2697  int32 chunksize;
2698  bool isnull;
2699  Pointer chunk;
2700  TupleDesc desc = RelationGetDescr(relation);
2701  Oid chunk_id;
2702  int32 chunk_seq;
2703 
2704  if (txn->toast_hash == NULL)
2705  ReorderBufferToastInitHash(rb, txn);
2706 
2707  Assert(IsToastRelation(relation));
2708 
2709  newtup = change->data.tp.newtuple;
2710  chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
2711  Assert(!isnull);
2712  chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
2713  Assert(!isnull);
2714 
2715  ent = (ReorderBufferToastEnt *)
2716  hash_search(txn->toast_hash,
2717  (void *) &chunk_id,
2718  HASH_ENTER,
2719  &found);
2720 
2721  if (!found)
2722  {
2723  Assert(ent->chunk_id == chunk_id);
2724  ent->num_chunks = 0;
2725  ent->last_chunk_seq = 0;
2726  ent->size = 0;
2727  ent->reconstructed = NULL;
2728  dlist_init(&ent->chunks);
2729 
2730  if (chunk_seq != 0)
2731  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
2732  chunk_seq, chunk_id);
2733  }
2734  else if (found && chunk_seq != ent->last_chunk_seq + 1)
2735  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
2736  chunk_seq, chunk_id, ent->last_chunk_seq + 1);
2737 
2738  chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
2739  Assert(!isnull);
2740 
2741  /* calculate size so we can allocate the right size at once later */
2742  if (!VARATT_IS_EXTENDED(chunk))
2743  chunksize = VARSIZE(chunk) - VARHDRSZ;
2744  else if (VARATT_IS_SHORT(chunk))
2745  /* could happen due to heap_form_tuple doing its thing */
2746  chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
2747  else
2748  elog(ERROR, "unexpected type of toast chunk");
2749 
2750  ent->size += chunksize;
2751  ent->last_chunk_seq = chunk_seq;
2752  ent->num_chunks++;
2753  dlist_push_tail(&ent->chunks, &change->node);
2754 }
bool IsToastRelation(Relation relation)
Definition: catalog.c:136
#define fastgetattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:719
#define DatumGetInt32(X)
Definition: postgres.h:478
#define RelationGetDescr(relation)
Definition: rel.h:428
#define VARHDRSZ_SHORT
Definition: postgres.h:269
#define VARSIZE(PTR)
Definition: postgres.h:304
#define VARHDRSZ
Definition: c.h:445
#define DatumGetObjectId(X)
Definition: postgres.h:506
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
union ReorderBufferChange::@93 data
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:902
unsigned int Oid
Definition: postgres_ext.h:31
signed int int32
Definition: c.h:256
char * Pointer
Definition: c.h:245
#define ERROR
Definition: elog.h:43
#define VARATT_IS_SHORT(PTR)
Definition: postgres.h:325
struct ReorderBufferChange::@93::@94 tp
struct varlena * reconstructed
HeapTupleData tuple
Definition: reorderbuffer.h:27
#define VARSIZE_SHORT(PTR)
Definition: postgres.h:306
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:676
#define VARATT_IS_EXTENDED(PTR)
Definition: postgres.h:326
#define DatumGetPointer(X)
Definition: postgres.h:555
#define elog
Definition: elog.h:219
static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferToastInitHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2670 of file reorderbuffer.c.

References Assert, ReorderBuffer::context, HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, NULL, and ReorderBufferTXN::toast_hash.

Referenced by ReorderBufferToastAppendChunk().

2671 {
2672  HASHCTL hash_ctl;
2673 
2674  Assert(txn->toast_hash == NULL);
2675 
2676  memset(&hash_ctl, 0, sizeof(hash_ctl));
2677  hash_ctl.keysize = sizeof(Oid);
2678  hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
2679  hash_ctl.hcxt = rb->context;
2680  txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
2682 }
struct ReorderBufferToastEnt ReorderBufferToastEnt
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
Size entrysize
Definition: hsearch.h:73
unsigned int Oid
Definition: postgres_ext.h:31
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext context
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
Size keysize
Definition: hsearch.h:72
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:676
static void ReorderBufferToastReplace ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 2764 of file reorderbuffer.c.

References Assert, ReorderBufferToastEnt::chunks, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, DatumGetPointer, dlist_container, dlist_foreach, fastgetattr, free, HASH_FIND, hash_search(), heap_deform_tuple(), heap_form_tuple(), INDIRECT_POINTER_SIZE, MaxHeapTupleSize, MemoryContextSwitchTo(), tupleDesc::natts, NULL, palloc0(), pfree(), varatt_indirect::pointer, PointerGetDatum, RelationData::rd_rel, ReorderBufferToastEnt::reconstructed, RelationClose(), RelationGetDescr, RelationIdGetRelation(), ReorderBufferTupleBufData, SET_VARSIZE, SET_VARSIZE_COMPRESSED, SET_VARTAG_EXTERNAL, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, ReorderBufferTupleBuf::tuple, TupleDescAttr, varatt_external::va_extsize, varatt_external::va_rawsize, varatt_external::va_valueid, VARATT_EXTERNAL_GET_POINTER, VARATT_EXTERNAL_IS_COMPRESSED, VARATT_IS_EXTERNAL, VARATT_IS_SHORT, VARDATA, VARDATA_EXTERNAL, VARHDRSZ, VARSIZE, and VARTAG_INDIRECT.

Referenced by ReorderBufferCommit().

2766 {
2767  TupleDesc desc;
2768  int natt;
2769  Datum *attrs;
2770  bool *isnull;
2771  bool *free;
2772  HeapTuple tmphtup;
2773  Relation toast_rel;
2774  TupleDesc toast_desc;
2775  MemoryContext oldcontext;
2776  ReorderBufferTupleBuf *newtup;
2777 
2778  /* no toast tuples changed */
2779  if (txn->toast_hash == NULL)
2780  return;
2781 
2782  oldcontext = MemoryContextSwitchTo(rb->context);
2783 
2784  /* we should only have toast tuples in an INSERT or UPDATE */
2785  Assert(change->data.tp.newtuple);
2786 
2787  desc = RelationGetDescr(relation);
2788 
2789  toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
2790  toast_desc = RelationGetDescr(toast_rel);
2791 
2792  /* should we allocate from stack instead? */
2793  attrs = palloc0(sizeof(Datum) * desc->natts);
2794  isnull = palloc0(sizeof(bool) * desc->natts);
2795  free = palloc0(sizeof(bool) * desc->natts);
2796 
2797  newtup = change->data.tp.newtuple;
2798 
2799  heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
2800 
2801  for (natt = 0; natt < desc->natts; natt++)
2802  {
2803  Form_pg_attribute attr = TupleDescAttr(desc, natt);
2804  ReorderBufferToastEnt *ent;
2805  struct varlena *varlena;
2806 
2807  /* va_rawsize is the size of the original datum -- including header */
2808  struct varatt_external toast_pointer;
2809  struct varatt_indirect redirect_pointer;
2810  struct varlena *new_datum = NULL;
2811  struct varlena *reconstructed;
2812  dlist_iter it;
2813  Size data_done = 0;
2814 
2815  /* system columns aren't toasted */
2816  if (attr->attnum < 0)
2817  continue;
2818 
2819  if (attr->attisdropped)
2820  continue;
2821 
2822  /* not a varlena datatype */
2823  if (attr->attlen != -1)
2824  continue;
2825 
2826  /* no data */
2827  if (isnull[natt])
2828  continue;
2829 
2830  /* ok, we know we have a toast datum */
2831  varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
2832 
2833  /* no need to do anything if the tuple isn't external */
2834  if (!VARATT_IS_EXTERNAL(varlena))
2835  continue;
2836 
2837  VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
2838 
2839  /*
2840  * Check whether the toast tuple changed, replace if so.
2841  */
2842  ent = (ReorderBufferToastEnt *)
2843  hash_search(txn->toast_hash,
2844  (void *) &toast_pointer.va_valueid,
2845  HASH_FIND,
2846  NULL);
2847  if (ent == NULL)
2848  continue;
2849 
2850  new_datum =
2851  (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
2852 
2853  free[natt] = true;
2854 
2855  reconstructed = palloc0(toast_pointer.va_rawsize);
2856 
2857  ent->reconstructed = reconstructed;
2858 
2859  /* stitch toast tuple back together from its parts */
2860  dlist_foreach(it, &ent->chunks)
2861  {
2862  bool isnull;
2863  ReorderBufferChange *cchange;
2864  ReorderBufferTupleBuf *ctup;
2865  Pointer chunk;
2866 
2867  cchange = dlist_container(ReorderBufferChange, node, it.cur);
2868  ctup = cchange->data.tp.newtuple;
2869  chunk = DatumGetPointer(
2870  fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
2871 
2872  Assert(!isnull);
2873  Assert(!VARATT_IS_EXTERNAL(chunk));
2874  Assert(!VARATT_IS_SHORT(chunk));
2875 
2876  memcpy(VARDATA(reconstructed) + data_done,
2877  VARDATA(chunk),
2878  VARSIZE(chunk) - VARHDRSZ);
2879  data_done += VARSIZE(chunk) - VARHDRSZ;
2880  }
2881  Assert(data_done == toast_pointer.va_extsize);
2882 
2883  /* make sure its marked as compressed or not */
2884  if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
2885  SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
2886  else
2887  SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
2888 
2889  memset(&redirect_pointer, 0, sizeof(redirect_pointer));
2890  redirect_pointer.pointer = reconstructed;
2891 
2893  memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
2894  sizeof(redirect_pointer));
2895 
2896  attrs[natt] = PointerGetDatum(new_datum);
2897  }
2898 
2899  /*
2900  * Build tuple in separate memory & copy tuple back into the tuplebuf
2901  * passed to the output plugin. We can't directly heap_fill_tuple() into
2902  * the tuplebuf because attrs[] will point back into the current content.
2903  */
2904  tmphtup = heap_form_tuple(desc, attrs, isnull);
2905  Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
2906  Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
2907 
2908  memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
2909  newtup->tuple.t_len = tmphtup->t_len;
2910 
2911  /*
2912  * free resources we won't further need, more persistent stuff will be
2913  * free'd in ReorderBufferToastReset().
2914  */
2915  RelationClose(toast_rel);
2916  pfree(tmphtup);
2917  for (natt = 0; natt < desc->natts; natt++)
2918  {
2919  if (free[natt])
2920  pfree(DatumGetPointer(attrs[natt]));
2921  }
2922  pfree(attrs);
2923  pfree(free);
2924  pfree(isnull);
2925 
2926  MemoryContextSwitchTo(oldcontext);
2927 }
#define VARDATA(PTR)
Definition: postgres.h:303
#define fastgetattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:719
#define RelationGetDescr(relation)
Definition: rel.h:428
#define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)
Definition: tuptoaster.h:111
#define VARSIZE(PTR)
Definition: postgres.h:304
#define PointerGetDatum(X)
Definition: postgres.h:562
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:84
#define VARHDRSZ
Definition: c.h:445
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
union ReorderBufferChange::@93 data
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:695
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:902
Form_pg_class rd_rel
Definition: rel.h:114
#define VARDATA_EXTERNAL(PTR)
Definition: postgres.h:311
int natts
Definition: tupdesc.h:73
HeapTupleHeader t_data
Definition: htup.h:67
#define VARATT_IS_EXTERNAL(PTR)
Definition: postgres.h:314
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:36
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:950
char * Pointer
Definition: c.h:245
#define VARATT_IS_SHORT(PTR)
Definition: postgres.h:325
struct ReorderBufferChange::@93::@94 tp
uint32 t_len
Definition: htup.h:64
#define MaxHeapTupleSize
Definition: htup_details.h:561
struct varlena * reconstructed
#define SET_VARTAG_EXTERNAL(PTR, tag)
Definition: postgres.h:332
HeapTupleData tuple
Definition: reorderbuffer.h:27
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:187
void RelationClose(Relation relation)
Definition: relcache.c:2164
#define INDIRECT_POINTER_SIZE
Definition: tuptoaster.h:102
MemoryContext context
void * palloc0(Size size)
Definition: mcxt.c:878
uintptr_t Datum
Definition: postgres.h:372
dlist_node * cur
Definition: ilist.h:161
#define free(a)
Definition: header.h:65
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:676
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
Definition: tuptoaster.h:121
size_t Size
Definition: c.h:356
#define DatumGetPointer(X)
Definition: postgres.h:555
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:936
#define SET_VARSIZE_COMPRESSED(PTR, len)
Definition: postgres.h:330
Definition: c.h:439
#define SET_VARSIZE(PTR, len)
Definition: postgres.h:328
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2075
static void ReorderBufferToastReset ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2933 of file reorderbuffer.c.

References ReorderBufferToastEnt::chunks, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, hash_destroy(), hash_seq_init(), hash_seq_search(), ReorderBufferChange::node, NULL, pfree(), ReorderBufferToastEnt::reconstructed, ReorderBufferReturnChange(), and ReorderBufferTXN::toast_hash.

Referenced by ReorderBufferCommit().

2934 {
2935  HASH_SEQ_STATUS hstat;
2936  ReorderBufferToastEnt *ent;
2937 
2938  if (txn->toast_hash == NULL)
2939  return;
2940 
2941  /* sequentially walk over the hash and free everything */
2942  hash_seq_init(&hstat, txn->toast_hash);
2943  while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
2944  {
2945  dlist_mutable_iter it;
2946 
2947  if (ent->reconstructed != NULL)
2948  pfree(ent->reconstructed);
2949 
2950  dlist_foreach_modify(it, &ent->chunks)
2951  {
2952  ReorderBufferChange *change =
2954 
2955  dlist_delete(&change->node);
2956  ReorderBufferReturnChange(rb, change);
2957  }
2958  }
2959 
2960  hash_destroy(txn->toast_hash);
2961  txn->toast_hash = NULL;
2962 }
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:810
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:950
struct varlena * reconstructed
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define NULL
Definition: c.h:229
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1385
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1375
static ReorderBufferTXN * ReorderBufferTXNByXid ( ReorderBuffer rb,
TransactionId  xid,
bool  create,
bool is_new,
XLogRecPtr  lsn,
bool  create_as_top 
)
static

Definition at line 495 of file reorderbuffer.c.

References Assert, AssertTXNLsnOrder(), ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::current_restart_decoding_lsn, dlist_push_tail(), ReorderBufferTXN::first_lsn, HASH_ENTER, HASH_FIND, hash_search(), InvalidXLogRecPtr, ReorderBufferTXN::node, NULL, ReorderBufferGetTXN(), ReorderBufferTXN::restart_decoding_lsn, ReorderBuffer::toplevel_by_lsn, TransactionIdIsValid, ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAddInvalidations(), ReorderBufferAddNewTupleCids(), ReorderBufferAssignChild(), ReorderBufferCommit(), ReorderBufferCommitChild(), ReorderBufferForget(), ReorderBufferProcessXid(), ReorderBufferQueueChange(), ReorderBufferQueueMessage(), ReorderBufferSetBaseSnapshot(), ReorderBufferXidHasBaseSnapshot(), ReorderBufferXidHasCatalogChanges(), and ReorderBufferXidSetCatalogChanges().

497 {
498  ReorderBufferTXN *txn;
500  bool found;
501 
503  Assert(!create || lsn != InvalidXLogRecPtr);
504 
505  /*
506  * Check the one-entry lookup cache first
507  */
509  rb->by_txn_last_xid == xid)
510  {
511  txn = rb->by_txn_last_txn;
512 
513  if (txn != NULL)
514  {
515  /* found it, and it's valid */
516  if (is_new)
517  *is_new = false;
518  return txn;
519  }
520 
521  /*
522  * cached as non-existent, and asked not to create? Then nothing else
523  * to do.
524  */
525  if (!create)
526  return NULL;
527  /* otherwise fall through to create it */
528  }
529 
530  /*
531  * If the cache wasn't hit or it yielded an "does-not-exist" and we want
532  * to create an entry.
533  */
534 
535  /* search the lookup table */
536  ent = (ReorderBufferTXNByIdEnt *)
537  hash_search(rb->by_txn,
538  (void *) &xid,
539  create ? HASH_ENTER : HASH_FIND,
540  &found);
541  if (found)
542  txn = ent->txn;
543  else if (create)
544  {
545  /* initialize the new entry, if creation was requested */
546  Assert(ent != NULL);
547 
548  ent->txn = ReorderBufferGetTXN(rb);
549  ent->txn->xid = xid;
550  txn = ent->txn;
551  txn->first_lsn = lsn;
553 
554  if (create_as_top)
555  {
556  dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
557  AssertTXNLsnOrder(rb);
558  }
559  }
560  else
561  txn = NULL; /* not found and not asked to create */
562 
563  /* update cache */
564  rb->by_txn_last_xid = xid;
565  rb->by_txn_last_txn = txn;
566 
567  if (is_new)
568  *is_new = !found;
569 
570  Assert(!create || txn != NULL);
571  return txn;
572 }
XLogRecPtr first_lsn
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TransactionId by_txn_last_xid
XLogRecPtr current_restart_decoding_lsn
static ReorderBufferTXN * ReorderBufferGetTXN(ReorderBuffer *rb)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:902
ReorderBufferTXN * by_txn_last_txn
dlist_head toplevel_by_lsn
TransactionId xid
static void AssertTXNLsnOrder(ReorderBuffer *rb)
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:676
ReorderBufferTXN * txn
Definition: reorderbuffer.c:82
XLogRecPtr restart_decoding_lsn
#define TransactionIdIsValid(xid)
Definition: transam.h:41
bool ReorderBufferXidHasBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 1989 of file reorderbuffer.c.

References ReorderBufferTXN::base_snapshot, InvalidXLogRecPtr, NULL, and ReorderBufferTXNByXid().

Referenced by SnapBuildCommitTxn(), SnapBuildDistributeNewCatalogSnapshot(), and SnapBuildProcessChange().

1990 {
1991  ReorderBufferTXN *txn;
1992 
1993  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1994  false);
1995 
1996  /* transaction isn't known yet, ergo no snapshot */
1997  if (txn == NULL)
1998  return false;
1999 
2000  /*
2001  * TODO: It would be a nice improvement if we would check the toplevel
2002  * transaction in subtransactions, but we'd need to keep track of a bit
2003  * more state.
2004  */
2005  return txn->base_snapshot != NULL;
2006 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
#define NULL
Definition: c.h:229
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
bool ReorderBufferXidHasCatalogChanges ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 1973 of file reorderbuffer.c.

References ReorderBufferTXN::has_catalog_changes, InvalidXLogRecPtr, NULL, and ReorderBufferTXNByXid().

Referenced by SnapBuildCommitTxn().

1974 {
1975  ReorderBufferTXN *txn;
1976 
1977  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1978  false);
1979  if (txn == NULL)
1980  return false;
1981 
1982  return txn->has_catalog_changes;
1983 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define NULL
Definition: c.h:229
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
void ReorderBufferXidSetCatalogChanges ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1958 of file reorderbuffer.c.

References ReorderBufferTXN::has_catalog_changes, NULL, and ReorderBufferTXNByXid().

Referenced by DecodeCommit(), DecodeHeapOp(), and SnapBuildProcessNewCid().

1960 {
1961  ReorderBufferTXN *txn;
1962 
1963  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1964 
1965  txn->has_catalog_changes = true;
1966 }
#define NULL
Definition: c.h:229
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
bool ResolveCminCmaxDuringDecoding ( HTAB tuplecid_data,
Snapshot  snapshot,
HeapTuple  htup,
Buffer  buffer,
CommandId cmin,
CommandId cmax 
)

Definition at line 3241 of file reorderbuffer.c.

References Assert, BufferGetTag(), BufferIsLocal, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, HASH_FIND, hash_search(), ItemPointerCopy, ItemPointerGetBlockNumber, MAIN_FORKNUM, NULL, ReorderBufferTupleCidKey::relnode, HeapTupleData::t_self, HeapTupleData::t_tableOid, ReorderBufferTupleCidKey::tid, and UpdateLogicalMappings().

Referenced by HeapTupleSatisfiesHistoricMVCC().

3245 {
3248  ForkNumber forkno;
3249  BlockNumber blockno;
3250  bool updated_mapping = false;
3251 
3252  /* be careful about padding */
3253  memset(&key, 0, sizeof(key));
3254 
3256 
3257  /*
3258  * get relfilenode from the buffer, no convenient way to access it other
3259  * than that.
3260  */
3261  BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
3262 
3263  /* tuples can only be in the main fork */
3264  Assert(forkno == MAIN_FORKNUM);
3265  Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
3266 
3267  ItemPointerCopy(&htup->t_self,
3268  &key.tid);
3269 
3270 restart:
3271  ent = (ReorderBufferTupleCidEnt *)
3272  hash_search(tuplecid_data,
3273  (void *) &key,
3274  HASH_FIND,
3275  NULL);
3276 
3277  /*
3278  * failed to find a mapping, check whether the table was rewritten and
3279  * apply mapping if so, but only do that once - there can be no new
3280  * mappings while we are in here since we have to hold a lock on the
3281  * relation.
3282  */
3283  if (ent == NULL && !updated_mapping)
3284  {
3285  UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
3286  /* now check but don't update for a mapping again */
3287  updated_mapping = true;
3288  goto restart;
3289  }
3290  else if (ent == NULL)
3291  return false;
3292 
3293  if (cmin)
3294  *cmin = ent->cmin;
3295  if (cmax)
3296  *cmax = ent->cmax;
3297  return true;
3298 }
uint32 BlockNumber
Definition: block.h:31
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:902
ItemPointerData t_self
Definition: htup.h:65
Oid t_tableOid
Definition: htup.h:66
ForkNumber
Definition: relpath.h:24
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:676
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:214
#define BufferIsLocal(buffer)
Definition: buf.h:37
static void UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
#define ItemPointerGetBlockNumber(pointer)
Definition: itemptr.h:76
void BufferGetTag(Buffer buffer, RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
Definition: bufmgr.c:2626
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:139
void StartupReorderBuffer ( void  )

Definition at line 2604 of file reorderbuffer.c.

References AllocateDir(), dirent::d_name, DEBUG2, ereport, errcode_for_file_access(), errmsg(), FreeDir(), lstat, MAXPGPATH, NULL, PANIC, ReadDir(), ReplicationSlotValidateName(), and unlink().

Referenced by StartupXLOG().

2605 {
2606  DIR *logical_dir;
2607  struct dirent *logical_de;
2608 
2609  DIR *spill_dir;
2610  struct dirent *spill_de;
2611 
2612  logical_dir = AllocateDir("pg_replslot");
2613  while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
2614  {
2615  struct stat statbuf;
2616  char path[MAXPGPATH * 2 + 12];
2617 
2618  if (strcmp(logical_de->d_name, ".") == 0 ||
2619  strcmp(logical_de->d_name, "..") == 0)
2620  continue;
2621 
2622  /* if it cannot be a slot, skip the directory */
2623  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
2624  continue;
2625 
2626  /*
2627  * ok, has to be a surviving logical slot, iterate and delete
2628  * everything starting with xid-*
2629  */
2630  sprintf(path, "pg_replslot/%s", logical_de->d_name);
2631 
2632  /* we're only creating directories here, skip if it's not our's */
2633  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2634  continue;
2635 
2636  spill_dir = AllocateDir(path);
2637  while ((spill_de = ReadDir(spill_dir, path)) != NULL)
2638  {
2639  if (strcmp(spill_de->d_name, ".") == 0 ||
2640  strcmp(spill_de->d_name, "..") == 0)
2641  continue;
2642 
2643  /* only look at names that can be ours */
2644  if (strncmp(spill_de->d_name, "xid", 3) == 0)
2645  {
2646  sprintf(path, "pg_replslot/%s/%s", logical_de->d_name,
2647  spill_de->d_name);
2648 
2649  if (unlink(path) != 0)
2650  ereport(PANIC,
2652  errmsg("could not remove file \"%s\": %m",
2653  path)));
2654  }
2655  }
2656  FreeDir(spill_dir);
2657  }
2658  FreeDir(logical_dir);
2659 }
Definition: dirent.h:9
#define PANIC
Definition: elog.h:53
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:174
Definition: dirent.c:25
#define MAXPGPATH
#define DEBUG2
Definition: elog.h:24
int errcode_for_file_access(void)
Definition: elog.c:598
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2335
int unlink(const char *filename)
#define ereport(elevel, rest)
Definition: elog.h:122
#define NULL
Definition: c.h:229
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2401
int errmsg(const char *fmt,...)
Definition: elog.c:797
char d_name[MAX_PATH]
Definition: dirent.h:14
#define lstat(path, sb)
Definition: win32.h:262
int FreeDir(DIR *dir)
Definition: fd.c:2444
static bool TransactionIdInArray ( TransactionId  xid,
TransactionId xip,
Size  num 
)
static

Definition at line 3125 of file reorderbuffer.c.

References NULL, and xidComparator().

Referenced by UpdateLogicalMappings().

3126 {
3127  return bsearch(&xid, xip, num,
3128  sizeof(TransactionId), xidComparator) != NULL;
3129 }
uint32 TransactionId
Definition: c.h:397
#define NULL
Definition: c.h:229
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:138
static void UpdateLogicalMappings ( HTAB tuplecid_data,
Oid  relid,
Snapshot  snapshot 
)
static

Definition at line 3152 of file reorderbuffer.c.

References AllocateDir(), ApplyLogicalMappingFile(), dirent::d_name, DEBUG1, elog, ERROR, file_sort_by_lsn(), RewriteMappingFile::fname, FreeDir(), InvalidOid, IsSharedRelation(), lappend(), lfirst, list_length(), LOGICAL_REWRITE_FORMAT, RewriteMappingFile::lsn, MyDatabaseId, NIL, NULL, palloc(), pfree(), qsort, ReadDir(), SnapshotData::subxcnt, SnapshotData::subxip, TransactionIdDidCommit(), and TransactionIdInArray().

Referenced by ResolveCminCmaxDuringDecoding().

3153 {
3154  DIR *mapping_dir;
3155  struct dirent *mapping_de;
3156  List *files = NIL;
3157  ListCell *file;
3158  RewriteMappingFile **files_a;
3159  size_t off;
3160  Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
3161 
3162  mapping_dir = AllocateDir("pg_logical/mappings");
3163  while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
3164  {
3165  Oid f_dboid;
3166  Oid f_relid;
3167  TransactionId f_mapped_xid;
3168  TransactionId f_create_xid;
3169  XLogRecPtr f_lsn;
3170  uint32 f_hi,