PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
rewriteheap.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <unistd.h>
#include "miscadmin.h"
#include "access/heapam.h"
#include "access/heapam_xlog.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/tuptoaster.h"
#include "access/xact.h"
#include "access/xloginsert.h"
#include "catalog/catalog.h"
#include "lib/ilist.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/slot.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/smgr.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/tqual.h"
#include "storage/procarray.h"
Include dependency graph for rewriteheap.c:

Go to the source code of this file.

Data Structures

struct  RewriteStateData
 
struct  TidHashKey
 
struct  UnresolvedTupData
 
struct  OldToNewMappingData
 
struct  RewriteMappingFile
 
struct  RewriteMappingDataEntry
 

Typedefs

typedef struct RewriteStateData RewriteStateData
 
typedef UnresolvedTupDataUnresolvedTup
 
typedef OldToNewMappingDataOldToNewMapping
 
typedef struct RewriteMappingFile RewriteMappingFile
 
typedef struct
RewriteMappingDataEntry 
RewriteMappingDataEntry
 

Functions

static void raw_heap_insert (RewriteState state, HeapTuple tup)
 
static void logical_begin_heap_rewrite (RewriteState state)
 
static void logical_rewrite_heap_tuple (RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
 
static void logical_end_heap_rewrite (RewriteState state)
 
RewriteState begin_heap_rewrite (Relation old_heap, Relation new_heap, TransactionId oldest_xmin, TransactionId freeze_xid, MultiXactId cutoff_multi, bool use_wal)
 
void end_heap_rewrite (RewriteState state)
 
void rewrite_heap_tuple (RewriteState state, HeapTuple old_tuple, HeapTuple new_tuple)
 
bool rewrite_heap_dead_tuple (RewriteState state, HeapTuple old_tuple)
 
static void logical_heap_rewrite_flush_mappings (RewriteState state)
 
static void logical_rewrite_log_mapping (RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
 
void heap_xlog_logical_rewrite (XLogReaderState *r)
 
void CheckPointLogicalRewriteHeap (void)
 

Typedef Documentation

Function Documentation

RewriteState begin_heap_rewrite ( Relation  old_heap,
Relation  new_heap,
TransactionId  oldest_xmin,
TransactionId  freeze_xid,
MultiXactId  cutoff_multi,
bool  use_wal 
)

Definition at line 248 of file rewriteheap.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate(), CurrentMemoryContext, HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, logical_begin_heap_rewrite(), MemoryContextSwitchTo(), palloc(), palloc0(), RelationGetNumberOfBlocks, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_cutoff_multi, RewriteStateData::rs_cxt, RewriteStateData::rs_freeze_xid, RewriteStateData::rs_new_rel, RewriteStateData::rs_old_new_tid_map, RewriteStateData::rs_old_rel, RewriteStateData::rs_oldest_xmin, RewriteStateData::rs_unresolved_tups, and RewriteStateData::rs_use_wal.

Referenced by copy_heap_data().

251 {
253  MemoryContext rw_cxt;
254  MemoryContext old_cxt;
255  HASHCTL hash_ctl;
256 
257  /*
258  * To ease cleanup, make a separate context that will contain the
259  * RewriteState struct itself plus all subsidiary data.
260  */
262  "Table rewrite",
264  old_cxt = MemoryContextSwitchTo(rw_cxt);
265 
266  /* Create and fill in the state struct */
267  state = palloc0(sizeof(RewriteStateData));
268 
269  state->rs_old_rel = old_heap;
270  state->rs_new_rel = new_heap;
271  state->rs_buffer = (Page) palloc(BLCKSZ);
272  /* new_heap needn't be empty, just locked */
273  state->rs_blockno = RelationGetNumberOfBlocks(new_heap);
274  state->rs_buffer_valid = false;
275  state->rs_use_wal = use_wal;
276  state->rs_oldest_xmin = oldest_xmin;
277  state->rs_freeze_xid = freeze_xid;
278  state->rs_cutoff_multi = cutoff_multi;
279  state->rs_cxt = rw_cxt;
280 
281  /* Initialize hash tables used to track update chains */
282  memset(&hash_ctl, 0, sizeof(hash_ctl));
283  hash_ctl.keysize = sizeof(TidHashKey);
284  hash_ctl.entrysize = sizeof(UnresolvedTupData);
285  hash_ctl.hcxt = state->rs_cxt;
286 
287  state->rs_unresolved_tups =
288  hash_create("Rewrite / Unresolved ctids",
289  128, /* arbitrary initial size */
290  &hash_ctl,
292 
293  hash_ctl.entrysize = sizeof(OldToNewMappingData);
294 
295  state->rs_old_new_tid_map =
296  hash_create("Rewrite / Old to new tid map",
297  128, /* arbitrary initial size */
298  &hash_ctl,
300 
301  MemoryContextSwitchTo(old_cxt);
302 
304 
305  return state;
306 }
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
TransactionId rs_freeze_xid
Definition: rewriteheap.c:152
MultiXactId rs_cutoff_multi
Definition: rewriteheap.c:156
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Size entrysize
Definition: hsearch.h:73
Relation rs_new_rel
Definition: rewriteheap.c:144
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:161
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
TransactionId rs_oldest_xmin
Definition: rewriteheap.c:150
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext rs_cxt
Definition: rewriteheap.c:158
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
void * palloc0(Size size)
Definition: mcxt.c:877
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
Size keysize
Definition: hsearch.h:72
#define RelationGetNumberOfBlocks(reln)
Definition: bufmgr.h:199
static void logical_begin_heap_rewrite(RewriteState state)
Definition: rewriteheap.c:799
HTAB * rs_old_new_tid_map
Definition: rewriteheap.c:162
Definition: regguts.h:298
void * palloc(Size size)
Definition: mcxt.c:848
Relation rs_old_rel
Definition: rewriteheap.c:143
BlockNumber rs_blockno
Definition: rewriteheap.c:146
Pointer Page
Definition: bufpage.h:74
void CheckPointLogicalRewriteHeap ( void  )

Definition at line 1198 of file rewriteheap.c.

References AllocateDir(), CloseTransientFile(), dirent::d_name, DEBUG1, elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FreeDir(), GetRedoRecPtr(), InvalidXLogRecPtr, LOGICAL_REWRITE_FORMAT, lstat, MAXPGPATH, OpenTransientFile(), PG_BINARY, pg_fsync(), pgstat_report_wait_end(), pgstat_report_wait_start(), ReadDir(), ReplicationSlotsComputeLogicalRestartLSN(), snprintf(), and WAIT_EVENT_LOGICAL_REWRITE_CHECKPOINT_SYNC.

Referenced by CheckPointGuts().

1199 {
1200  XLogRecPtr cutoff;
1201  XLogRecPtr redo;
1202  DIR *mappings_dir;
1203  struct dirent *mapping_de;
1204  char path[MAXPGPATH + 20];
1205 
1206  /*
1207  * We start of with a minimum of the last redo pointer. No new decoding
1208  * slot will start before that, so that's a safe upper bound for removal.
1209  */
1210  redo = GetRedoRecPtr();
1211 
1212  /* now check for the restart ptrs from existing slots */
1214 
1215  /* don't start earlier than the restart lsn */
1216  if (cutoff != InvalidXLogRecPtr && redo < cutoff)
1217  cutoff = redo;
1218 
1219  mappings_dir = AllocateDir("pg_logical/mappings");
1220  while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
1221  {
1222  struct stat statbuf;
1223  Oid dboid;
1224  Oid relid;
1225  XLogRecPtr lsn;
1226  TransactionId rewrite_xid;
1227  TransactionId create_xid;
1228  uint32 hi,
1229  lo;
1230 
1231  if (strcmp(mapping_de->d_name, ".") == 0 ||
1232  strcmp(mapping_de->d_name, "..") == 0)
1233  continue;
1234 
1235  snprintf(path, sizeof(path), "pg_logical/mappings/%s", mapping_de->d_name);
1236  if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
1237  continue;
1238 
1239  /* Skip over files that cannot be ours. */
1240  if (strncmp(mapping_de->d_name, "map-", 4) != 0)
1241  continue;
1242 
1243  if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
1244  &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
1245  elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
1246 
1247  lsn = ((uint64) hi) << 32 | lo;
1248 
1249  if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
1250  {
1251  elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
1252  if (unlink(path) < 0)
1253  ereport(ERROR,
1255  errmsg("could not remove file \"%s\": %m", path)));
1256  }
1257  else
1258  {
1259  int fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1260 
1261  /*
1262  * The file cannot vanish due to concurrency since this function
1263  * is the only one removing logical mappings and it's run while
1264  * CheckpointLock is held exclusively.
1265  */
1266  if (fd < 0)
1267  ereport(ERROR,
1269  errmsg("could not open file \"%s\": %m", path)));
1270 
1271  /*
1272  * We could try to avoid fsyncing files that either haven't
1273  * changed or have only been created since the checkpoint's start,
1274  * but it's currently not deemed worth the effort.
1275  */
1277  if (pg_fsync(fd) != 0)
1278  ereport(ERROR,
1280  errmsg("could not fsync file \"%s\": %m", path)));
1282  CloseTransientFile(fd);
1283  }
1284  }
1285  FreeDir(mappings_dir);
1286 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define DEBUG1
Definition: elog.h:25
uint32 TransactionId
Definition: c.h:391
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
unsigned int Oid
Definition: postgres_ext.h:31
Definition: dirent.h:9
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1044
Definition: dirent.c:25
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2167
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:598
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition: slot.c:784
unsigned int uint32
Definition: c.h:258
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2367
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1244
#define ereport(elevel, rest)
Definition: elog.h:122
int CloseTransientFile(int fd)
Definition: fd.c:2337
uint64 XLogRecPtr
Definition: xlogdefs.h:21
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2433
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1220
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:8247
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
int errmsg(const char *fmt,...)
Definition: elog.c:797
int pg_fsync(int fd)
Definition: fd.c:338
char d_name[MAX_PATH]
Definition: dirent.h:14
#define elog
Definition: elog.h:219
#define lstat(path, sb)
Definition: win32.h:262
int FreeDir(DIR *dir)
Definition: fd.c:2476
void end_heap_rewrite ( RewriteState  state)

Definition at line 314 of file rewriteheap.c.

References hash_seq_init(), hash_seq_search(), heap_sync(), ItemPointerSetInvalid, log_newpage(), logical_end_heap_rewrite(), MAIN_FORKNUM, MemoryContextDelete(), PageSetChecksumInplace(), raw_heap_insert(), RelationData::rd_node, RelationData::rd_smgr, RelationNeedsWAL, RelationOpenSmgr, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_cxt, RewriteStateData::rs_new_rel, RewriteStateData::rs_unresolved_tups, RewriteStateData::rs_use_wal, smgrextend(), HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, and UnresolvedTupData::tuple.

Referenced by copy_heap_data().

315 {
316  HASH_SEQ_STATUS seq_status;
317  UnresolvedTup unresolved;
318 
319  /*
320  * Write any remaining tuples in the UnresolvedTups table. If we have any
321  * left, they should in fact be dead, but let's err on the safe side.
322  */
323  hash_seq_init(&seq_status, state->rs_unresolved_tups);
324 
325  while ((unresolved = hash_seq_search(&seq_status)) != NULL)
326  {
327  ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
328  raw_heap_insert(state, unresolved->tuple);
329  }
330 
331  /* Write the last page, if any */
332  if (state->rs_buffer_valid)
333  {
334  if (state->rs_use_wal)
335  log_newpage(&state->rs_new_rel->rd_node,
336  MAIN_FORKNUM,
337  state->rs_blockno,
338  state->rs_buffer,
339  true);
341 
343 
345  (char *) state->rs_buffer, true);
346  }
347 
348  /*
349  * If the rel is WAL-logged, must fsync before commit. We use heap_sync
350  * to ensure that the toast table gets fsync'd too.
351  *
352  * It's obvious that we must do this when not WAL-logging. It's less
353  * obvious that we have to do it even if we did WAL-log the pages. The
354  * reason is the same as in tablecmds.c's copy_relation_data(): we're
355  * writing data that's not in shared buffers, and so a CHECKPOINT
356  * occurring during the rewriteheap operation won't have fsync'd data we
357  * wrote before the checkpoint.
358  */
359  if (RelationNeedsWAL(state->rs_new_rel))
360  heap_sync(state->rs_new_rel);
361 
363 
364  /* Deleting the context frees everything */
365  MemoryContextDelete(state->rs_cxt);
366 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
static void logical_end_heap_rewrite(RewriteState state)
Definition: rewriteheap.c:946
struct SMgrRelationData * rd_smgr
Definition: rel.h:87
Relation rs_new_rel
Definition: rewriteheap.c:144
void heap_sync(Relation rel)
Definition: heapam.c:9202
HeapTupleHeader t_data
Definition: htup.h:67
#define RelationOpenSmgr(relation)
Definition: rel.h:460
ItemPointerData t_ctid
Definition: htup_details.h:150
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:161
MemoryContext rs_cxt
Definition: rewriteheap.c:158
RelFileNode rd_node
Definition: rel.h:85
void PageSetChecksumInplace(Page page, BlockNumber blkno)
Definition: bufpage.c:1199
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1385
#define RelationNeedsWAL(relation)
Definition: rel.h:505
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1375
void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync)
Definition: smgr.c:600
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:150
XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, Page page, bool page_std)
Definition: xloginsert.c:972
static void raw_heap_insert(RewriteState state, HeapTuple tup)
Definition: rewriteheap.c:628
BlockNumber rs_blockno
Definition: rewriteheap.c:146
void heap_xlog_logical_rewrite ( XLogReaderState r)

Definition at line 1117 of file rewriteheap.c.

References CloseTransientFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ftruncate, LOGICAL_REWRITE_FORMAT, xl_heap_rewrite_mapping::mapped_db, xl_heap_rewrite_mapping::mapped_rel, xl_heap_rewrite_mapping::mapped_xid, MAXPGPATH, xl_heap_rewrite_mapping::num_mappings, xl_heap_rewrite_mapping::offset, OpenTransientFile(), PG_BINARY, pg_fsync(), pgstat_report_wait_end(), pgstat_report_wait_start(), snprintf(), xl_heap_rewrite_mapping::start_lsn, WAIT_EVENT_LOGICAL_REWRITE_MAPPING_SYNC, WAIT_EVENT_LOGICAL_REWRITE_MAPPING_WRITE, WAIT_EVENT_LOGICAL_REWRITE_TRUNCATE, write, XLogRecGetData, and XLogRecGetXid.

Referenced by heap2_redo().

1118 {
1119  char path[MAXPGPATH];
1120  int fd;
1121  xl_heap_rewrite_mapping *xlrec;
1122  uint32 len;
1123  char *data;
1124 
1125  xlrec = (xl_heap_rewrite_mapping *) XLogRecGetData(r);
1126 
1127  snprintf(path, MAXPGPATH,
1128  "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
1129  xlrec->mapped_db, xlrec->mapped_rel,
1130  (uint32) (xlrec->start_lsn >> 32),
1131  (uint32) xlrec->start_lsn,
1132  xlrec->mapped_xid, XLogRecGetXid(r));
1133 
1134  fd = OpenTransientFile(path,
1135  O_CREAT | O_WRONLY | PG_BINARY);
1136  if (fd < 0)
1137  ereport(ERROR,
1139  errmsg("could not create file \"%s\": %m", path)));
1140 
1141  /*
1142  * Truncate all data that's not guaranteed to have been safely fsynced (by
1143  * previous record or by the last checkpoint).
1144  */
1146  if (ftruncate(fd, xlrec->offset) != 0)
1147  ereport(ERROR,
1149  errmsg("could not truncate file \"%s\" to %u: %m",
1150  path, (uint32) xlrec->offset)));
1152 
1153  /* now seek to the position we want to write our data to */
1154  if (lseek(fd, xlrec->offset, SEEK_SET) != xlrec->offset)
1155  ereport(ERROR,
1157  errmsg("could not seek to end of file \"%s\": %m",
1158  path)));
1159 
1160  data = XLogRecGetData(r) + sizeof(*xlrec);
1161 
1162  len = xlrec->num_mappings * sizeof(LogicalRewriteMappingData);
1163 
1164  /* write out tail end of mapping file (again) */
1166  if (write(fd, data, len) != len)
1167  ereport(ERROR,
1169  errmsg("could not write to file \"%s\": %m", path)));
1171 
1172  /*
1173  * Now fsync all previously written data. We could improve things and only
1174  * do this for the last write to a file, but the required bookkeeping
1175  * doesn't seem worth the trouble.
1176  */
1178  if (pg_fsync(fd) != 0)
1179  ereport(ERROR,
1181  errmsg("could not fsync file \"%s\": %m", path)));
1183 
1184  CloseTransientFile(fd);
1185 }
#define write(a, b, c)
Definition: win32.h:14
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1044
#define XLogRecGetData(decoder)
Definition: xlogreader.h:226
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2167
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:258
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1244
#define ereport(elevel, rest)
Definition: elog.h:122
int CloseTransientFile(int fd)
Definition: fd.c:2337
#define XLogRecGetXid(decoder)
Definition: xlogreader.h:224
TransactionId mapped_xid
Definition: heapam_xlog.h:362
#define ftruncate(a, b)
Definition: win32.h:59
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1220
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
int errmsg(const char *fmt,...)
Definition: elog.c:797
struct LogicalRewriteMappingData LogicalRewriteMappingData
int pg_fsync(int fd)
Definition: fd.c:338
static void logical_begin_heap_rewrite ( RewriteState  state)
static

Definition at line 799 of file rewriteheap.c.

References HASHCTL::entrysize, GetXLogInsertRecPtr(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, HASHCTL::keysize, ProcArrayGetReplicationSlotXmin(), RelationIsAccessibleInLogicalDecoding, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_cxt, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_logical_xmin, RewriteStateData::rs_num_rewrite_mappings, and RewriteStateData::rs_old_rel.

Referenced by begin_heap_rewrite().

800 {
801  HASHCTL hash_ctl;
802  TransactionId logical_xmin;
803 
804  /*
805  * We only need to persist these mappings if the rewritten table can be
806  * accessed during logical decoding, if not, we can skip doing any
807  * additional work.
808  */
809  state->rs_logical_rewrite =
811 
812  if (!state->rs_logical_rewrite)
813  return;
814 
815  ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin);
816 
817  /*
818  * If there are no logical slots in progress we don't need to do anything,
819  * there cannot be any remappings for relevant rows yet. The relation's
820  * lock protects us against races.
821  */
822  if (logical_xmin == InvalidTransactionId)
823  {
824  state->rs_logical_rewrite = false;
825  return;
826  }
827 
828  state->rs_logical_xmin = logical_xmin;
830  state->rs_num_rewrite_mappings = 0;
831 
832  memset(&hash_ctl, 0, sizeof(hash_ctl));
833  hash_ctl.keysize = sizeof(TransactionId);
834  hash_ctl.entrysize = sizeof(RewriteMappingFile);
835  hash_ctl.hcxt = state->rs_cxt;
836 
837  state->rs_logical_mappings =
838  hash_create("Logical rewrite mapping",
839  128, /* arbitrary initial size */
840  &hash_ctl,
842 }
TransactionId rs_logical_xmin
Definition: rewriteheap.c:154
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
uint32 TransactionId
Definition: c.h:391
MemoryContext hcxt
Definition: hsearch.h:78
Size entrysize
Definition: hsearch.h:73
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:2986
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:164
XLogRecPtr GetXLogInsertRecPtr(void)
Definition: xlog.c:11196
#define InvalidTransactionId
Definition: transam.h:31
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:160
HTAB * rs_logical_mappings
Definition: rewriteheap.c:163
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext rs_cxt
Definition: rewriteheap.c:158
#define RelationIsAccessibleInLogicalDecoding(relation)
Definition: rel.h:559
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
Size keysize
Definition: hsearch.h:72
struct RewriteMappingFile RewriteMappingFile
Relation rs_old_rel
Definition: rewriteheap.c:143
static void logical_end_heap_rewrite ( RewriteState  state)
static

Definition at line 946 of file rewriteheap.c.

References ereport, errcode_for_file_access(), errmsg(), ERROR, FileClose(), FileSync(), hash_seq_init(), hash_seq_search(), logical_heap_rewrite_flush_mappings(), RewriteMappingFile::path, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_num_rewrite_mappings, RewriteMappingFile::vfd, and WAIT_EVENT_LOGICAL_REWRITE_SYNC.

Referenced by end_heap_rewrite().

947 {
948  HASH_SEQ_STATUS seq_status;
949  RewriteMappingFile *src;
950 
951  /* done, no logical rewrite in progress */
952  if (!state->rs_logical_rewrite)
953  return;
954 
955  /* writeout remaining in-memory entries */
956  if (state->rs_num_rewrite_mappings > 0)
958 
959  /* Iterate over all mappings we have written and fsync the files. */
960  hash_seq_init(&seq_status, state->rs_logical_mappings);
961  while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
962  {
964  ereport(ERROR,
966  errmsg("could not fsync file \"%s\": %m", src->path)));
967  FileClose(src->vfd);
968  }
969  /* memory context cleanup will deal with the rest */
970 }
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:164
#define ERROR
Definition: elog.h:43
char path[MAXPGPATH]
Definition: rewriteheap.c:210
int FileSync(File file, uint32 wait_event_info)
Definition: fd.c:1853
int errcode_for_file_access(void)
Definition: elog.c:598
#define ereport(elevel, rest)
Definition: elog.h:122
HTAB * rs_logical_mappings
Definition: rewriteheap.c:163
void FileClose(File file)
Definition: fd.c:1516
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1385
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1375
static void logical_heap_rewrite_flush_mappings(RewriteState state)
Definition: rewriteheap.c:848
int errmsg(const char *fmt,...)
Definition: elog.c:797
static void logical_heap_rewrite_flush_mappings ( RewriteState  state)
static

Definition at line 848 of file rewriteheap.c.

References Assert, dlist_mutable_iter::cur, DEBUG1, dlist_container, dlist_delete(), dlist_foreach_modify, elog, ereport, errcode_for_file_access(), errmsg(), ERROR, FileWrite(), hash_seq_init(), hash_seq_search(), InvalidOid, RewriteMappingDataEntry::map, xl_heap_rewrite_mapping::mapped_db, xl_heap_rewrite_mapping::mapped_rel, xl_heap_rewrite_mapping::mapped_xid, RewriteMappingFile::mappings, MyDatabaseId, RewriteMappingDataEntry::node, RewriteMappingFile::num_mappings, xl_heap_rewrite_mapping::num_mappings, RewriteMappingFile::off, xl_heap_rewrite_mapping::offset, palloc(), RewriteMappingFile::path, pfree(), RelationData::rd_rel, RelationGetRelid, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_num_rewrite_mappings, RewriteStateData::rs_old_rel, xl_heap_rewrite_mapping::start_lsn, RewriteMappingFile::vfd, WAIT_EVENT_LOGICAL_REWRITE_WRITE, RewriteMappingFile::xid, XLOG_HEAP2_REWRITE, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by logical_end_heap_rewrite(), and logical_rewrite_log_mapping().

849 {
850  HASH_SEQ_STATUS seq_status;
851  RewriteMappingFile *src;
852  dlist_mutable_iter iter;
853 
854  Assert(state->rs_logical_rewrite);
855 
856  /* no logical rewrite in progress, no need to iterate over mappings */
857  if (state->rs_num_rewrite_mappings == 0)
858  return;
859 
860  elog(DEBUG1, "flushing %u logical rewrite mapping entries",
861  state->rs_num_rewrite_mappings);
862 
863  hash_seq_init(&seq_status, state->rs_logical_mappings);
864  while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
865  {
866  char *waldata;
867  char *waldata_start;
869  Oid dboid;
870  uint32 len;
871  int written;
872 
873  /* this file hasn't got any new mappings */
874  if (src->num_mappings == 0)
875  continue;
876 
877  if (state->rs_old_rel->rd_rel->relisshared)
878  dboid = InvalidOid;
879  else
880  dboid = MyDatabaseId;
881 
882  xlrec.num_mappings = src->num_mappings;
883  xlrec.mapped_rel = RelationGetRelid(state->rs_old_rel);
884  xlrec.mapped_xid = src->xid;
885  xlrec.mapped_db = dboid;
886  xlrec.offset = src->off;
887  xlrec.start_lsn = state->rs_begin_lsn;
888 
889  /* write all mappings consecutively */
890  len = src->num_mappings * sizeof(LogicalRewriteMappingData);
891  waldata_start = waldata = palloc(len);
892 
893  /*
894  * collect data we need to write out, but don't modify ondisk data yet
895  */
896  dlist_foreach_modify(iter, &src->mappings)
897  {
899 
900  pmap = dlist_container(RewriteMappingDataEntry, node, iter.cur);
901 
902  memcpy(waldata, &pmap->map, sizeof(pmap->map));
903  waldata += sizeof(pmap->map);
904 
905  /* remove from the list and free */
906  dlist_delete(&pmap->node);
907  pfree(pmap);
908 
909  /* update bookkeeping */
910  state->rs_num_rewrite_mappings--;
911  src->num_mappings--;
912  }
913 
914  Assert(src->num_mappings == 0);
915  Assert(waldata == waldata_start + len);
916 
917  /*
918  * Note that we deviate from the usual WAL coding practices here,
919  * check the above "Logical rewrite support" comment for reasoning.
920  */
921  written = FileWrite(src->vfd, waldata_start, len,
923  if (written != len)
924  ereport(ERROR,
926  errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path,
927  written, len)));
928  src->off += len;
929 
930  XLogBeginInsert();
931  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
932  XLogRegisterData(waldata_start, len);
933 
934  /* write xlog record */
935  XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_REWRITE);
936 
937  pfree(waldata_start);
938  }
939  Assert(state->rs_num_rewrite_mappings == 0);
940 }
#define DEBUG1
Definition: elog.h:25
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define XLOG_HEAP2_REWRITE
Definition: heapam_xlog.h:53
Form_pg_class rd_rel
Definition: rel.h:114
unsigned int Oid
Definition: postgres_ext.h:31
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:949
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:164
#define ERROR
Definition: elog.h:43
char path[MAXPGPATH]
Definition: rewriteheap.c:210
TransactionId xid
Definition: rewriteheap.c:205
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:258
int FileWrite(File file, char *buffer, int amount, uint32 wait_event_info)
Definition: fd.c:1732
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define ereport(elevel, rest)
Definition: elog.h:122
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:160
HTAB * rs_logical_mappings
Definition: rewriteheap.c:163
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:323
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:415
Oid MyDatabaseId
Definition: globals.c:77
#define InvalidOid
Definition: postgres_ext.h:36
TransactionId mapped_xid
Definition: heapam_xlog.h:362
#define Assert(condition)
Definition: c.h:681
dlist_head mappings
Definition: rewriteheap.c:209
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1385
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1375
void * palloc(Size size)
Definition: mcxt.c:848
int errmsg(const char *fmt,...)
Definition: elog.c:797
LogicalRewriteMappingData map
Definition: rewriteheap.c:219
struct LogicalRewriteMappingData LogicalRewriteMappingData
Relation rs_old_rel
Definition: rewriteheap.c:143
#define elog
Definition: elog.h:219
void XLogBeginInsert(void)
Definition: xloginsert.c:120
#define RelationGetRelid(relation)
Definition: rel.h:416
static void logical_rewrite_heap_tuple ( RewriteState  state,
ItemPointerData  old_tid,
HeapTuple  new_tuple 
)
static

Definition at line 1043 of file rewriteheap.c.

References HEAP_XMAX_IS_LOCKED_ONLY, HeapTupleHeaderGetUpdateXid, HeapTupleHeaderGetXmin, logical_rewrite_log_mapping(), LogicalRewriteMappingData::new_node, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_node, LogicalRewriteMappingData::old_tid, RelationData::rd_node, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_logical_xmin, RewriteStateData::rs_new_rel, RewriteStateData::rs_old_rel, HeapTupleData::t_data, HeapTupleHeaderData::t_infomask, HeapTupleData::t_self, TransactionIdEquals, TransactionIdIsNormal, and TransactionIdPrecedes().

Referenced by rewrite_heap_tuple().

1045 {
1046  ItemPointerData new_tid = new_tuple->t_self;
1047  TransactionId cutoff = state->rs_logical_xmin;
1048  TransactionId xmin;
1049  TransactionId xmax;
1050  bool do_log_xmin = false;
1051  bool do_log_xmax = false;
1053 
1054  /* no logical rewrite in progress, we don't need to log anything */
1055  if (!state->rs_logical_rewrite)
1056  return;
1057 
1058  xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
1059  /* use *GetUpdateXid to correctly deal with multixacts */
1060  xmax = HeapTupleHeaderGetUpdateXid(new_tuple->t_data);
1061 
1062  /*
1063  * Log the mapping iff the tuple has been created recently.
1064  */
1065  if (TransactionIdIsNormal(xmin) && !TransactionIdPrecedes(xmin, cutoff))
1066  do_log_xmin = true;
1067 
1068  if (!TransactionIdIsNormal(xmax))
1069  {
1070  /*
1071  * no xmax is set, can't have any permanent ones, so this check is
1072  * sufficient
1073  */
1074  }
1075  else if (HEAP_XMAX_IS_LOCKED_ONLY(new_tuple->t_data->t_infomask))
1076  {
1077  /* only locked, we don't care */
1078  }
1079  else if (!TransactionIdPrecedes(xmax, cutoff))
1080  {
1081  /* tuple has been deleted recently, log */
1082  do_log_xmax = true;
1083  }
1084 
1085  /* if neither needs to be logged, we're done */
1086  if (!do_log_xmin && !do_log_xmax)
1087  return;
1088 
1089  /* fill out mapping information */
1090  map.old_node = state->rs_old_rel->rd_node;
1091  map.old_tid = old_tid;
1092  map.new_node = state->rs_new_rel->rd_node;
1093  map.new_tid = new_tid;
1094 
1095  /* ---
1096  * Now persist the mapping for the individual xids that are affected. We
1097  * need to log for both xmin and xmax if they aren't the same transaction
1098  * since the mapping files are per "affected" xid.
1099  * We don't muster all that much effort detecting whether xmin and xmax
1100  * are actually the same transaction, we just check whether the xid is the
1101  * same disregarding subtransactions. Logging too much is relatively
1102  * harmless and we could never do the check fully since subtransaction
1103  * data is thrown away during restarts.
1104  * ---
1105  */
1106  if (do_log_xmin)
1107  logical_rewrite_log_mapping(state, xmin, &map);
1108  /* separately log mapping for xmax unless it'd be redundant */
1109  if (do_log_xmax && !TransactionIdEquals(xmin, xmax))
1110  logical_rewrite_log_mapping(state, xmax, &map);
1111 }
#define HeapTupleHeaderGetUpdateXid(tup)
Definition: htup_details.h:359
TransactionId rs_logical_xmin
Definition: rewriteheap.c:154
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
uint32 TransactionId
Definition: c.h:391
Relation rs_new_rel
Definition: rewriteheap.c:144
HeapTupleHeader t_data
Definition: htup.h:67
ItemPointerData t_self
Definition: htup.h:65
static void logical_rewrite_log_mapping(RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
Definition: rewriteheap.c:976
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define HEAP_XMAX_IS_LOCKED_ONLY(infomask)
Definition: htup_details.h:216
RelFileNode rd_node
Definition: rel.h:85
ItemPointerData new_tid
Definition: rewriteheap.h:40
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:307
Relation rs_old_rel
Definition: rewriteheap.c:143
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
ItemPointerData old_tid
Definition: rewriteheap.h:39
static void logical_rewrite_log_mapping ( RewriteState  state,
TransactionId  xid,
LogicalRewriteMappingData map 
)
static

Definition at line 976 of file rewriteheap.c.

References dlist_init(), dlist_push_tail(), ereport, errcode_for_file_access(), errmsg(), ERROR, GetCurrentTransactionId(), HASH_ENTER, hash_search(), InvalidOid, logical_heap_rewrite_flush_mappings(), LOGICAL_REWRITE_FORMAT, RewriteMappingDataEntry::map, RewriteMappingFile::mappings, MAXPGPATH, MemoryContextAlloc(), MyDatabaseId, RewriteMappingDataEntry::node, RewriteMappingFile::num_mappings, RewriteMappingFile::off, RewriteMappingFile::path, PathNameOpenFile(), PG_BINARY, RelationData::rd_rel, RelationGetRelid, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_cxt, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_num_rewrite_mappings, RewriteStateData::rs_old_rel, snprintf(), and RewriteMappingFile::vfd.

Referenced by logical_rewrite_heap_tuple().

978 {
979  RewriteMappingFile *src;
981  Oid relid;
982  bool found;
983 
984  relid = RelationGetRelid(state->rs_old_rel);
985 
986  /* look for existing mappings for this 'mapped' xid */
987  src = hash_search(state->rs_logical_mappings, &xid,
988  HASH_ENTER, &found);
989 
990  /*
991  * We haven't yet had the need to map anything for this xid, create
992  * per-xid data structures.
993  */
994  if (!found)
995  {
996  char path[MAXPGPATH];
997  Oid dboid;
998 
999  if (state->rs_old_rel->rd_rel->relisshared)
1000  dboid = InvalidOid;
1001  else
1002  dboid = MyDatabaseId;
1003 
1004  snprintf(path, MAXPGPATH,
1005  "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
1006  dboid, relid,
1007  (uint32) (state->rs_begin_lsn >> 32),
1008  (uint32) state->rs_begin_lsn,
1009  xid, GetCurrentTransactionId());
1010 
1011  dlist_init(&src->mappings);
1012  src->num_mappings = 0;
1013  src->off = 0;
1014  memcpy(src->path, path, sizeof(path));
1015  src->vfd = PathNameOpenFile(path,
1016  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1017  if (src->vfd < 0)
1018  ereport(ERROR,
1020  errmsg("could not create file \"%s\": %m", path)));
1021  }
1022 
1023  pmap = MemoryContextAlloc(state->rs_cxt,
1024  sizeof(RewriteMappingDataEntry));
1025  memcpy(&pmap->map, map, sizeof(LogicalRewriteMappingData));
1026  dlist_push_tail(&src->mappings, &pmap->node);
1027  src->num_mappings++;
1028  state->rs_num_rewrite_mappings++;
1029 
1030  /*
1031  * Write out buffer every time we've too many in-memory entries across all
1032  * mapping files.
1033  */
1034  if (state->rs_num_rewrite_mappings >= 1000 /* arbitrary number */ )
1036 }
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1315
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:902
Form_pg_class rd_rel
Definition: rel.h:114
unsigned int Oid
Definition: postgres_ext.h:31
#define PG_BINARY
Definition: c.h:1044
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:164
#define ERROR
Definition: elog.h:43
char path[MAXPGPATH]
Definition: rewriteheap.c:210
#define MAXPGPATH
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:418
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:258
#define ereport(elevel, rest)
Definition: elog.h:122
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:160
HTAB * rs_logical_mappings
Definition: rewriteheap.c:163
MemoryContext rs_cxt
Definition: rewriteheap.c:158
Oid MyDatabaseId
Definition: globals.c:77
#define InvalidOid
Definition: postgres_ext.h:36
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
dlist_head mappings
Definition: rewriteheap.c:209
static void logical_heap_rewrite_flush_mappings(RewriteState state)
Definition: rewriteheap.c:848
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
int errmsg(const char *fmt,...)
Definition: elog.c:797
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:706
LogicalRewriteMappingData map
Definition: rewriteheap.c:219
Relation rs_old_rel
Definition: rewriteheap.c:143
#define RelationGetRelid(relation)
Definition: rel.h:416
static void raw_heap_insert ( RewriteState  state,
HeapTuple  tup 
)
static

Definition at line 628 of file rewriteheap.c.

References Assert, elog, ereport, errcode(), errmsg(), ERROR, HEAP_DEFAULT_FILLFACTOR, heap_freetuple(), HEAP_INSERT_SKIP_FSM, HEAP_INSERT_SKIP_WAL, HeapTupleHasExternal, InvalidOffsetNumber, ItemPointerIsValid, ItemPointerSet, log_newpage(), MAIN_FORKNUM, MAXALIGN, MaxHeapTupleSize, PageAddItem, PageGetHeapFreeSpace(), PageGetItem, PageGetItemId, PageInit(), PageSetChecksumInplace(), RelationData::rd_node, RelationData::rd_rel, RelationData::rd_smgr, RelationGetTargetPageFreeSpace, RelationOpenSmgr, RELKIND_TOASTVALUE, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_new_rel, RewriteStateData::rs_use_wal, smgrextend(), HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, HeapTupleData::t_len, HeapTupleData::t_self, toast_insert_or_update(), and TOAST_TUPLE_THRESHOLD.

Referenced by end_heap_rewrite(), and rewrite_heap_tuple().

629 {
630  Page page = state->rs_buffer;
631  Size pageFreeSpace,
632  saveFreeSpace;
633  Size len;
634  OffsetNumber newoff;
635  HeapTuple heaptup;
636 
637  /*
638  * If the new tuple is too big for storage or contains already toasted
639  * out-of-line attributes from some other relation, invoke the toaster.
640  *
641  * Note: below this point, heaptup is the data we actually intend to store
642  * into the relation; tup is the caller's original untoasted data.
643  */
644  if (state->rs_new_rel->rd_rel->relkind == RELKIND_TOASTVALUE)
645  {
646  /* toast table entries should never be recursively toasted */
648  heaptup = tup;
649  }
650  else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
651  heaptup = toast_insert_or_update(state->rs_new_rel, tup, NULL,
653  (state->rs_use_wal ?
654  0 : HEAP_INSERT_SKIP_WAL));
655  else
656  heaptup = tup;
657 
658  len = MAXALIGN(heaptup->t_len); /* be conservative */
659 
660  /*
661  * If we're gonna fail for oversize tuple, do it right away
662  */
663  if (len > MaxHeapTupleSize)
664  ereport(ERROR,
665  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
666  errmsg("row is too big: size %zu, maximum size %zu",
667  len, MaxHeapTupleSize)));
668 
669  /* Compute desired extra freespace due to fillfactor option */
670  saveFreeSpace = RelationGetTargetPageFreeSpace(state->rs_new_rel,
672 
673  /* Now we can check to see if there's enough free space already. */
674  if (state->rs_buffer_valid)
675  {
676  pageFreeSpace = PageGetHeapFreeSpace(page);
677 
678  if (len + saveFreeSpace > pageFreeSpace)
679  {
680  /* Doesn't fit, so write out the existing page */
681 
682  /* XLOG stuff */
683  if (state->rs_use_wal)
684  log_newpage(&state->rs_new_rel->rd_node,
685  MAIN_FORKNUM,
686  state->rs_blockno,
687  page,
688  true);
689 
690  /*
691  * Now write the page. We say isTemp = true even if it's not a
692  * temp table, because there's no need for smgr to schedule an
693  * fsync for this write; we'll do it ourselves in
694  * end_heap_rewrite.
695  */
697 
698  PageSetChecksumInplace(page, state->rs_blockno);
699 
701  state->rs_blockno, (char *) page, true);
702 
703  state->rs_blockno++;
704  state->rs_buffer_valid = false;
705  }
706  }
707 
708  if (!state->rs_buffer_valid)
709  {
710  /* Initialize a new empty page */
711  PageInit(page, BLCKSZ, 0);
712  state->rs_buffer_valid = true;
713  }
714 
715  /* And now we can insert the tuple into the page */
716  newoff = PageAddItem(page, (Item) heaptup->t_data, heaptup->t_len,
717  InvalidOffsetNumber, false, true);
718  if (newoff == InvalidOffsetNumber)
719  elog(ERROR, "failed to add tuple");
720 
721  /* Update caller's t_self to the actual position where it was stored */
722  ItemPointerSet(&(tup->t_self), state->rs_blockno, newoff);
723 
724  /*
725  * Insert the correct position into CTID of the stored tuple, too, if the
726  * caller didn't supply a valid CTID.
727  */
728  if (!ItemPointerIsValid(&tup->t_data->t_ctid))
729  {
730  ItemId newitemid;
731  HeapTupleHeader onpage_tup;
732 
733  newitemid = PageGetItemId(page, newoff);
734  onpage_tup = (HeapTupleHeader) PageGetItem(page, newitemid);
735 
736  onpage_tup->t_ctid = tup->t_self;
737  }
738 
739  /* If heaptup is a private copy, release it. */
740  if (heaptup != tup)
741  heap_freetuple(heaptup);
742 }
#define ItemPointerIsValid(pointer)
Definition: itemptr.h:60
#define TOAST_TUPLE_THRESHOLD
Definition: tuptoaster.h:55
HeapTuple toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, int options)
Definition: tuptoaster.c:534
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
struct SMgrRelationData * rd_smgr
Definition: rel.h:87
Pointer Item
Definition: item.h:17
Relation rs_new_rel
Definition: rewriteheap.c:144
int errcode(int sqlerrcode)
Definition: elog.c:575
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:412
#define HEAP_INSERT_SKIP_WAL
Definition: heapam.h:28
Form_pg_class rd_rel
Definition: rel.h:114
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1373
uint16 OffsetNumber
Definition: off.h:24
HeapTupleHeader t_data
Definition: htup.h:67
#define RelationOpenSmgr(relation)
Definition: rel.h:460
#define ERROR
Definition: elog.h:43
Size PageGetHeapFreeSpace(Page page)
Definition: bufpage.c:666
ItemPointerData t_ctid
Definition: htup_details.h:150
ItemPointerData t_self
Definition: htup.h:65
uint32 t_len
Definition: htup.h:64
#define MaxHeapTupleSize
Definition: htup_details.h:561
#define ereport(elevel, rest)
Definition: elog.h:122
#define RelationGetTargetPageFreeSpace(relation, defaultff)
Definition: rel.h:307
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:231
#define RELKIND_TOASTVALUE
Definition: pg_class.h:163
#define InvalidOffsetNumber
Definition: off.h:26
RelFileNode rd_node
Definition: rel.h:85
#define Assert(condition)
Definition: c.h:681
size_t Size
Definition: c.h:350
void PageSetChecksumInplace(Page page, BlockNumber blkno)
Definition: bufpage.c:1199
#define MAXALIGN(LEN)
Definition: c.h:576
void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync)
Definition: smgr.c:600
#define HEAP_INSERT_SKIP_FSM
Definition: heapam.h:29
#define HeapTupleHasExternal(tuple)
Definition: htup_details.h:674
int errmsg(const char *fmt,...)
Definition: elog.c:797
XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, Page page, bool page_std)
Definition: xloginsert.c:972
#define elog
Definition: elog.h:219
#define HEAP_DEFAULT_FILLFACTOR
Definition: rel.h:286
BlockNumber rs_blockno
Definition: rewriteheap.c:146
#define PageGetItem(page, itemId)
Definition: bufpage.h:336
Pointer Page
Definition: bufpage.h:74
#define ItemPointerSet(pointer, blockNumber, offNum)
Definition: itemptr.h:105
void PageInit(Page page, Size pageSize, Size specialSize)
Definition: bufpage.c:41
bool rewrite_heap_dead_tuple ( RewriteState  state,
HeapTuple  old_tuple 
)

Definition at line 578 of file rewriteheap.c.

References Assert, HASH_FIND, HASH_REMOVE, hash_search(), heap_freetuple(), HeapTupleHeaderGetXmin, RewriteStateData::rs_unresolved_tups, HeapTupleData::t_data, HeapTupleData::t_self, TidHashKey::tid, UnresolvedTupData::tuple, and TidHashKey::xmin.

Referenced by copy_heap_data().

579 {
580  /*
581  * If we have already seen an earlier tuple in the update chain that
582  * points to this tuple, let's forget about that earlier tuple. It's in
583  * fact dead as well, our simple xmax < OldestXmin test in
584  * HeapTupleSatisfiesVacuum just wasn't enough to detect it. It happens
585  * when xmin of a tuple is greater than xmax, which sounds
586  * counter-intuitive but is perfectly valid.
587  *
588  * We don't bother to try to detect the situation the other way round,
589  * when we encounter the dead tuple first and then the recently dead one
590  * that points to it. If that happens, we'll have some unmatched entries
591  * in the UnresolvedTups hash table at the end. That can happen anyway,
592  * because a vacuum might have removed the dead tuple in the chain before
593  * us.
594  */
595  UnresolvedTup unresolved;
596  TidHashKey hashkey;
597  bool found;
598 
599  memset(&hashkey, 0, sizeof(hashkey));
600  hashkey.xmin = HeapTupleHeaderGetXmin(old_tuple->t_data);
601  hashkey.tid = old_tuple->t_self;
602 
603  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
604  HASH_FIND, NULL);
605 
606  if (unresolved != NULL)
607  {
608  /* Need to free the contained tuple as well as the hashtable entry */
609  heap_freetuple(unresolved->tuple);
610  hash_search(state->rs_unresolved_tups, &hashkey,
611  HASH_REMOVE, &found);
612  Assert(found);
613  return true;
614  }
615 
616  return false;
617 }
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:902
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1373
HeapTupleHeader t_data
Definition: htup.h:67
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:161
ItemPointerData t_self
Definition: htup.h:65
ItemPointerData tid
Definition: rewriteheap.c:176
#define Assert(condition)
Definition: c.h:681
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:307
TransactionId xmin
Definition: rewriteheap.c:175
void rewrite_heap_tuple ( RewriteState  state,
HeapTuple  old_tuple,
HeapTuple  new_tuple 
)

Definition at line 380 of file rewriteheap.c.

References Assert, HASH_ENTER, HASH_FIND, HASH_REMOVE, hash_search(), HEAP2_XACT_MASK, heap_copytuple(), heap_freetuple(), heap_freeze_tuple(), HEAP_UPDATED, HEAP_XACT_MASK, HEAP_XMAX_INVALID, HeapTupleHeaderGetUpdateXid, HeapTupleHeaderGetXmin, HeapTupleHeaderIsOnlyLocked(), ItemPointerEquals(), ItemPointerSetInvalid, logical_rewrite_heap_tuple(), MemoryContextSwitchTo(), OldToNewMappingData::new_tid, UnresolvedTupData::old_tid, raw_heap_insert(), RewriteStateData::rs_cutoff_multi, RewriteStateData::rs_cxt, RewriteStateData::rs_freeze_xid, RewriteStateData::rs_old_new_tid_map, RewriteStateData::rs_oldest_xmin, RewriteStateData::rs_unresolved_tups, HeapTupleHeaderData::t_choice, HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, HeapTupleHeaderData::t_heap, HeapTupleHeaderData::t_infomask, HeapTupleHeaderData::t_infomask2, HeapTupleData::t_self, TidHashKey::tid, TransactionIdPrecedes(), UnresolvedTupData::tuple, and TidHashKey::xmin.

Referenced by reform_and_rewrite_tuple().

382 {
383  MemoryContext old_cxt;
384  ItemPointerData old_tid;
385  TidHashKey hashkey;
386  bool found;
387  bool free_new;
388 
389  old_cxt = MemoryContextSwitchTo(state->rs_cxt);
390 
391  /*
392  * Copy the original tuple's visibility information into new_tuple.
393  *
394  * XXX we might later need to copy some t_infomask2 bits, too? Right now,
395  * we intentionally clear the HOT status bits.
396  */
397  memcpy(&new_tuple->t_data->t_choice.t_heap,
398  &old_tuple->t_data->t_choice.t_heap,
399  sizeof(HeapTupleFields));
400 
401  new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
402  new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
403  new_tuple->t_data->t_infomask |=
404  old_tuple->t_data->t_infomask & HEAP_XACT_MASK;
405 
406  /*
407  * While we have our hands on the tuple, we may as well freeze any
408  * eligible xmin or xmax, so that future VACUUM effort can be saved.
409  */
410  heap_freeze_tuple(new_tuple->t_data, state->rs_freeze_xid,
411  state->rs_cutoff_multi);
412 
413  /*
414  * Invalid ctid means that ctid should point to the tuple itself. We'll
415  * override it later if the tuple is part of an update chain.
416  */
417  ItemPointerSetInvalid(&new_tuple->t_data->t_ctid);
418 
419  /*
420  * If the tuple has been updated, check the old-to-new mapping hash table.
421  */
422  if (!((old_tuple->t_data->t_infomask & HEAP_XMAX_INVALID) ||
423  HeapTupleHeaderIsOnlyLocked(old_tuple->t_data)) &&
424  !(ItemPointerEquals(&(old_tuple->t_self),
425  &(old_tuple->t_data->t_ctid))))
426  {
427  OldToNewMapping mapping;
428 
429  memset(&hashkey, 0, sizeof(hashkey));
430  hashkey.xmin = HeapTupleHeaderGetUpdateXid(old_tuple->t_data);
431  hashkey.tid = old_tuple->t_data->t_ctid;
432 
433  mapping = (OldToNewMapping)
434  hash_search(state->rs_old_new_tid_map, &hashkey,
435  HASH_FIND, NULL);
436 
437  if (mapping != NULL)
438  {
439  /*
440  * We've already copied the tuple that t_ctid points to, so we can
441  * set the ctid of this tuple to point to the new location, and
442  * insert it right away.
443  */
444  new_tuple->t_data->t_ctid = mapping->new_tid;
445 
446  /* We don't need the mapping entry anymore */
447  hash_search(state->rs_old_new_tid_map, &hashkey,
448  HASH_REMOVE, &found);
449  Assert(found);
450  }
451  else
452  {
453  /*
454  * We haven't seen the tuple t_ctid points to yet. Stash this
455  * tuple into unresolved_tups to be written later.
456  */
457  UnresolvedTup unresolved;
458 
459  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
460  HASH_ENTER, &found);
461  Assert(!found);
462 
463  unresolved->old_tid = old_tuple->t_self;
464  unresolved->tuple = heap_copytuple(new_tuple);
465 
466  /*
467  * We can't do anything more now, since we don't know where the
468  * tuple will be written.
469  */
470  MemoryContextSwitchTo(old_cxt);
471  return;
472  }
473  }
474 
475  /*
476  * Now we will write the tuple, and then check to see if it is the B tuple
477  * in any new or known pair. When we resolve a known pair, we will be
478  * able to write that pair's A tuple, and then we have to check if it
479  * resolves some other pair. Hence, we need a loop here.
480  */
481  old_tid = old_tuple->t_self;
482  free_new = false;
483 
484  for (;;)
485  {
486  ItemPointerData new_tid;
487 
488  /* Insert the tuple and find out where it's put in new_heap */
489  raw_heap_insert(state, new_tuple);
490  new_tid = new_tuple->t_self;
491 
492  logical_rewrite_heap_tuple(state, old_tid, new_tuple);
493 
494  /*
495  * If the tuple is the updated version of a row, and the prior version
496  * wouldn't be DEAD yet, then we need to either resolve the prior
497  * version (if it's waiting in rs_unresolved_tups), or make an entry
498  * in rs_old_new_tid_map (so we can resolve it when we do see it). The
499  * previous tuple's xmax would equal this one's xmin, so it's
500  * RECENTLY_DEAD if and only if the xmin is not before OldestXmin.
501  */
502  if ((new_tuple->t_data->t_infomask & HEAP_UPDATED) &&
504  state->rs_oldest_xmin))
505  {
506  /*
507  * Okay, this is B in an update pair. See if we've seen A.
508  */
509  UnresolvedTup unresolved;
510 
511  memset(&hashkey, 0, sizeof(hashkey));
512  hashkey.xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
513  hashkey.tid = old_tid;
514 
515  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
516  HASH_FIND, NULL);
517 
518  if (unresolved != NULL)
519  {
520  /*
521  * We have seen and memorized the previous tuple already. Now
522  * that we know where we inserted the tuple its t_ctid points
523  * to, fix its t_ctid and insert it to the new heap.
524  */
525  if (free_new)
526  heap_freetuple(new_tuple);
527  new_tuple = unresolved->tuple;
528  free_new = true;
529  old_tid = unresolved->old_tid;
530  new_tuple->t_data->t_ctid = new_tid;
531 
532  /*
533  * We don't need the hash entry anymore, but don't free its
534  * tuple just yet.
535  */
536  hash_search(state->rs_unresolved_tups, &hashkey,
537  HASH_REMOVE, &found);
538  Assert(found);
539 
540  /* loop back to insert the previous tuple in the chain */
541  continue;
542  }
543  else
544  {
545  /*
546  * Remember the new tid of this tuple. We'll use it to set the
547  * ctid when we find the previous tuple in the chain.
548  */
549  OldToNewMapping mapping;
550 
551  mapping = hash_search(state->rs_old_new_tid_map, &hashkey,
552  HASH_ENTER, &found);
553  Assert(!found);
554 
555  mapping->new_tid = new_tid;
556  }
557  }
558 
559  /* Done with this (chain of) tuples, for now */
560  if (free_new)
561  heap_freetuple(new_tuple);
562  break;
563  }
564 
565  MemoryContextSwitchTo(old_cxt);
566 }
#define HeapTupleHeaderGetUpdateXid(tup)
Definition: htup_details.h:359
bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple)
Definition: tqual.c:1605
HeapTuple heap_copytuple(HeapTuple tuple)
Definition: heaptuple.c:611
union HeapTupleHeaderData::@45 t_choice
HeapTupleFields t_heap
Definition: htup_details.h:146
static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
Definition: rewriteheap.c:1043
TransactionId rs_freeze_xid
Definition: rewriteheap.c:152
MultiXactId rs_cutoff_multi
Definition: rewriteheap.c:156
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define HEAP2_XACT_MASK
Definition: htup_details.h:269
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:902
#define HEAP_UPDATED
Definition: htup_details.h:195
ItemPointerData old_tid
Definition: rewriteheap.c:185
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1373
bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId cutoff_xid, TransactionId cutoff_multi)
Definition: heapam.c:6887
HeapTupleHeader t_data
Definition: htup.h:67
ItemPointerData new_tid
Definition: rewriteheap.c:194
#define HEAP_XMAX_INVALID
Definition: htup_details.h:193
ItemPointerData t_ctid
Definition: htup_details.h:150
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:161
ItemPointerData t_self
Definition: htup.h:65
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
TransactionId rs_oldest_xmin
Definition: rewriteheap.c:150
MemoryContext rs_cxt
Definition: rewriteheap.c:158
ItemPointerData tid
Definition: rewriteheap.c:176
#define Assert(condition)
Definition: c.h:681
HTAB * rs_old_new_tid_map
Definition: rewriteheap.c:162
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:307
bool ItemPointerEquals(ItemPointer pointer1, ItemPointer pointer2)
Definition: itemptr.c:29
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:150
TransactionId xmin
Definition: rewriteheap.c:175
static void raw_heap_insert(RewriteState state, HeapTuple tup)
Definition: rewriteheap.c:628
#define HEAP_XACT_MASK
Definition: htup_details.h:204
OldToNewMappingData * OldToNewMapping
Definition: rewriteheap.c:197