PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
rewriteheap.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <unistd.h>
#include "miscadmin.h"
#include "access/heapam.h"
#include "access/heapam_xlog.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/tuptoaster.h"
#include "access/xact.h"
#include "access/xloginsert.h"
#include "catalog/catalog.h"
#include "lib/ilist.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/slot.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/smgr.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/tqual.h"
#include "storage/procarray.h"
Include dependency graph for rewriteheap.c:

Go to the source code of this file.

Data Structures

struct  RewriteStateData
 
struct  TidHashKey
 
struct  UnresolvedTupData
 
struct  OldToNewMappingData
 
struct  RewriteMappingFile
 
struct  RewriteMappingDataEntry
 

Typedefs

typedef struct RewriteStateData RewriteStateData
 
typedef UnresolvedTupDataUnresolvedTup
 
typedef OldToNewMappingDataOldToNewMapping
 
typedef struct RewriteMappingFile RewriteMappingFile
 
typedef struct
RewriteMappingDataEntry 
RewriteMappingDataEntry
 

Functions

static void raw_heap_insert (RewriteState state, HeapTuple tup)
 
static void logical_begin_heap_rewrite (RewriteState state)
 
static void logical_rewrite_heap_tuple (RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
 
static void logical_end_heap_rewrite (RewriteState state)
 
RewriteState begin_heap_rewrite (Relation old_heap, Relation new_heap, TransactionId oldest_xmin, TransactionId freeze_xid, MultiXactId cutoff_multi, bool use_wal)
 
void end_heap_rewrite (RewriteState state)
 
void rewrite_heap_tuple (RewriteState state, HeapTuple old_tuple, HeapTuple new_tuple)
 
bool rewrite_heap_dead_tuple (RewriteState state, HeapTuple old_tuple)
 
static void logical_heap_rewrite_flush_mappings (RewriteState state)
 
static void logical_rewrite_log_mapping (RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
 
void heap_xlog_logical_rewrite (XLogReaderState *r)
 
void CheckPointLogicalRewriteHeap (void)
 

Typedef Documentation

Function Documentation

RewriteState begin_heap_rewrite ( Relation  old_heap,
Relation  new_heap,
TransactionId  oldest_xmin,
TransactionId  freeze_xid,
MultiXactId  cutoff_multi,
bool  use_wal 
)

Definition at line 248 of file rewriteheap.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate(), CurrentMemoryContext, HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, logical_begin_heap_rewrite(), MemoryContextSwitchTo(), palloc(), palloc0(), RelationGetNumberOfBlocks, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_cutoff_multi, RewriteStateData::rs_cxt, RewriteStateData::rs_freeze_xid, RewriteStateData::rs_new_rel, RewriteStateData::rs_old_new_tid_map, RewriteStateData::rs_old_rel, RewriteStateData::rs_oldest_xmin, RewriteStateData::rs_unresolved_tups, and RewriteStateData::rs_use_wal.

Referenced by copy_heap_data().

251 {
253  MemoryContext rw_cxt;
254  MemoryContext old_cxt;
255  HASHCTL hash_ctl;
256 
257  /*
258  * To ease cleanup, make a separate context that will contain the
259  * RewriteState struct itself plus all subsidiary data.
260  */
262  "Table rewrite",
264  old_cxt = MemoryContextSwitchTo(rw_cxt);
265 
266  /* Create and fill in the state struct */
267  state = palloc0(sizeof(RewriteStateData));
268 
269  state->rs_old_rel = old_heap;
270  state->rs_new_rel = new_heap;
271  state->rs_buffer = (Page) palloc(BLCKSZ);
272  /* new_heap needn't be empty, just locked */
273  state->rs_blockno = RelationGetNumberOfBlocks(new_heap);
274  state->rs_buffer_valid = false;
275  state->rs_use_wal = use_wal;
276  state->rs_oldest_xmin = oldest_xmin;
277  state->rs_freeze_xid = freeze_xid;
278  state->rs_cutoff_multi = cutoff_multi;
279  state->rs_cxt = rw_cxt;
280 
281  /* Initialize hash tables used to track update chains */
282  memset(&hash_ctl, 0, sizeof(hash_ctl));
283  hash_ctl.keysize = sizeof(TidHashKey);
284  hash_ctl.entrysize = sizeof(UnresolvedTupData);
285  hash_ctl.hcxt = state->rs_cxt;
286 
287  state->rs_unresolved_tups =
288  hash_create("Rewrite / Unresolved ctids",
289  128, /* arbitrary initial size */
290  &hash_ctl,
292 
293  hash_ctl.entrysize = sizeof(OldToNewMappingData);
294 
295  state->rs_old_new_tid_map =
296  hash_create("Rewrite / Old to new tid map",
297  128, /* arbitrary initial size */
298  &hash_ctl,
300 
301  MemoryContextSwitchTo(old_cxt);
302 
304 
305  return state;
306 }
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
TransactionId rs_freeze_xid
Definition: rewriteheap.c:152
MultiXactId rs_cutoff_multi
Definition: rewriteheap.c:156
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Size entrysize
Definition: hsearch.h:73
Relation rs_new_rel
Definition: rewriteheap.c:144
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:161
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
TransactionId rs_oldest_xmin
Definition: rewriteheap.c:150
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext rs_cxt
Definition: rewriteheap.c:158
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
void * palloc0(Size size)
Definition: mcxt.c:878
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:301
Size keysize
Definition: hsearch.h:72
#define RelationGetNumberOfBlocks(reln)
Definition: bufmgr.h:199
static void logical_begin_heap_rewrite(RewriteState state)
Definition: rewriteheap.c:799
HTAB * rs_old_new_tid_map
Definition: rewriteheap.c:162
Definition: regguts.h:298
void * palloc(Size size)
Definition: mcxt.c:849
Relation rs_old_rel
Definition: rewriteheap.c:143
BlockNumber rs_blockno
Definition: rewriteheap.c:146
Pointer Page
Definition: bufpage.h:74
void CheckPointLogicalRewriteHeap ( void  )

Definition at line 1200 of file rewriteheap.c.

References AllocateDir(), CloseTransientFile(), dirent::d_name, DEBUG1, elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FreeDir(), GetRedoRecPtr(), InvalidXLogRecPtr, LOGICAL_REWRITE_FORMAT, lstat, MAXPGPATH, NULL, OpenTransientFile(), PG_BINARY, pg_fsync(), pgstat_report_wait_end(), pgstat_report_wait_start(), ReadDir(), ReplicationSlotsComputeLogicalRestartLSN(), snprintf(), unlink(), and WAIT_EVENT_LOGICAL_REWRITE_CHECKPOINT_SYNC.

Referenced by CheckPointGuts().

1201 {
1202  XLogRecPtr cutoff;
1203  XLogRecPtr redo;
1204  DIR *mappings_dir;
1205  struct dirent *mapping_de;
1206  char path[MAXPGPATH + 20];
1207 
1208  /*
1209  * We start of with a minimum of the last redo pointer. No new decoding
1210  * slot will start before that, so that's a safe upper bound for removal.
1211  */
1212  redo = GetRedoRecPtr();
1213 
1214  /* now check for the restart ptrs from existing slots */
1216 
1217  /* don't start earlier than the restart lsn */
1218  if (cutoff != InvalidXLogRecPtr && redo < cutoff)
1219  cutoff = redo;
1220 
1221  mappings_dir = AllocateDir("pg_logical/mappings");
1222  while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
1223  {
1224  struct stat statbuf;
1225  Oid dboid;
1226  Oid relid;
1227  XLogRecPtr lsn;
1228  TransactionId rewrite_xid;
1229  TransactionId create_xid;
1230  uint32 hi,
1231  lo;
1232 
1233  if (strcmp(mapping_de->d_name, ".") == 0 ||
1234  strcmp(mapping_de->d_name, "..") == 0)
1235  continue;
1236 
1237  snprintf(path, sizeof(path), "pg_logical/mappings/%s", mapping_de->d_name);
1238  if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
1239  continue;
1240 
1241  /* Skip over files that cannot be ours. */
1242  if (strncmp(mapping_de->d_name, "map-", 4) != 0)
1243  continue;
1244 
1245  if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
1246  &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
1247  elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
1248 
1249  lsn = ((uint64) hi) << 32 | lo;
1250 
1251  if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
1252  {
1253  elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
1254  if (unlink(path) < 0)
1255  ereport(ERROR,
1257  errmsg("could not remove file \"%s\": %m", path)));
1258  }
1259  else
1260  {
1261  int fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
1262 
1263  /*
1264  * The file cannot vanish due to concurrency since this function
1265  * is the only one removing logical mappings and it's run while
1266  * CheckpointLock is held exclusively.
1267  */
1268  if (fd < 0)
1269  ereport(ERROR,
1271  errmsg("could not open file \"%s\": %m", path)));
1272 
1273  /*
1274  * We could try to avoid fsyncing files that either haven't
1275  * changed or have only been created since the checkpoint's start,
1276  * but it's currently not deemed worth the effort.
1277  */
1279  if (pg_fsync(fd) != 0)
1280  ereport(ERROR,
1282  errmsg("could not fsync file \"%s\": %m", path)));
1284  CloseTransientFile(fd);
1285  }
1286  }
1287  FreeDir(mappings_dir);
1288 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define DEBUG1
Definition: elog.h:25
uint32 TransactionId
Definition: c.h:397
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
unsigned int Oid
Definition: postgres_ext.h:31
Definition: dirent.h:9
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1038
Definition: dirent.c:25
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2144
int errcode_for_file_access(void)
Definition: elog.c:598
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition: slot.c:723
unsigned int uint32
Definition: c.h:268
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2335
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1232
int unlink(const char *filename)
#define ereport(elevel, rest)
Definition: elog.h:122
int CloseTransientFile(int fd)
Definition: fd.c:2305
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2401
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1208
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:8162
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
int errmsg(const char *fmt,...)
Definition: elog.c:797
int pg_fsync(int fd)
Definition: fd.c:333
char d_name[MAX_PATH]
Definition: dirent.h:14
#define elog
Definition: elog.h:219
#define lstat(path, sb)
Definition: win32.h:262
int FreeDir(DIR *dir)
Definition: fd.c:2444
void end_heap_rewrite ( RewriteState  state)

Definition at line 314 of file rewriteheap.c.

References hash_seq_init(), hash_seq_search(), heap_sync(), ItemPointerSetInvalid, log_newpage(), logical_end_heap_rewrite(), MAIN_FORKNUM, MemoryContextDelete(), NULL, PageSetChecksumInplace(), raw_heap_insert(), RelationData::rd_node, RelationData::rd_smgr, RelationNeedsWAL, RelationOpenSmgr, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_cxt, RewriteStateData::rs_new_rel, RewriteStateData::rs_unresolved_tups, RewriteStateData::rs_use_wal, smgrextend(), HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, and UnresolvedTupData::tuple.

Referenced by copy_heap_data().

315 {
316  HASH_SEQ_STATUS seq_status;
317  UnresolvedTup unresolved;
318 
319  /*
320  * Write any remaining tuples in the UnresolvedTups table. If we have any
321  * left, they should in fact be dead, but let's err on the safe side.
322  */
323  hash_seq_init(&seq_status, state->rs_unresolved_tups);
324 
325  while ((unresolved = hash_seq_search(&seq_status)) != NULL)
326  {
327  ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
328  raw_heap_insert(state, unresolved->tuple);
329  }
330 
331  /* Write the last page, if any */
332  if (state->rs_buffer_valid)
333  {
334  if (state->rs_use_wal)
335  log_newpage(&state->rs_new_rel->rd_node,
336  MAIN_FORKNUM,
337  state->rs_blockno,
338  state->rs_buffer,
339  true);
341 
343 
345  (char *) state->rs_buffer, true);
346  }
347 
348  /*
349  * If the rel is WAL-logged, must fsync before commit. We use heap_sync
350  * to ensure that the toast table gets fsync'd too.
351  *
352  * It's obvious that we must do this when not WAL-logging. It's less
353  * obvious that we have to do it even if we did WAL-log the pages. The
354  * reason is the same as in tablecmds.c's copy_relation_data(): we're
355  * writing data that's not in shared buffers, and so a CHECKPOINT
356  * occurring during the rewriteheap operation won't have fsync'd data we
357  * wrote before the checkpoint.
358  */
359  if (RelationNeedsWAL(state->rs_new_rel))
360  heap_sync(state->rs_new_rel);
361 
363 
364  /* Deleting the context frees everything */
365  MemoryContextDelete(state->rs_cxt);
366 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
static void logical_end_heap_rewrite(RewriteState state)
Definition: rewriteheap.c:946
struct SMgrRelationData * rd_smgr
Definition: rel.h:87
Relation rs_new_rel
Definition: rewriteheap.c:144
void heap_sync(Relation rel)
Definition: heapam.c:9084
HeapTupleHeader t_data
Definition: htup.h:67
#define RelationOpenSmgr(relation)
Definition: rel.h:460
ItemPointerData t_ctid
Definition: htup_details.h:150
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:161
MemoryContext rs_cxt
Definition: rewriteheap.c:158
RelFileNode rd_node
Definition: rel.h:85
#define NULL
Definition: c.h:229
void PageSetChecksumInplace(Page page, BlockNumber blkno)
Definition: bufpage.c:1199
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1351
#define RelationNeedsWAL(relation)
Definition: rel.h:505
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1341
void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync)
Definition: smgr.c:600
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:150
XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, Page page, bool page_std)
Definition: xloginsert.c:972
static void raw_heap_insert(RewriteState state, HeapTuple tup)
Definition: rewriteheap.c:628
BlockNumber rs_blockno
Definition: rewriteheap.c:146
void heap_xlog_logical_rewrite ( XLogReaderState r)

Definition at line 1118 of file rewriteheap.c.

References CloseTransientFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ftruncate, LOGICAL_REWRITE_FORMAT, xl_heap_rewrite_mapping::mapped_db, xl_heap_rewrite_mapping::mapped_rel, xl_heap_rewrite_mapping::mapped_xid, MAXPGPATH, xl_heap_rewrite_mapping::num_mappings, xl_heap_rewrite_mapping::offset, OpenTransientFile(), PG_BINARY, pg_fsync(), pgstat_report_wait_end(), pgstat_report_wait_start(), snprintf(), xl_heap_rewrite_mapping::start_lsn, WAIT_EVENT_LOGICAL_REWRITE_MAPPING_SYNC, WAIT_EVENT_LOGICAL_REWRITE_MAPPING_WRITE, WAIT_EVENT_LOGICAL_REWRITE_TRUNCATE, write, XLogRecGetData, and XLogRecGetXid.

Referenced by heap2_redo().

1119 {
1120  char path[MAXPGPATH];
1121  int fd;
1122  xl_heap_rewrite_mapping *xlrec;
1123  uint32 len;
1124  char *data;
1125 
1126  xlrec = (xl_heap_rewrite_mapping *) XLogRecGetData(r);
1127 
1128  snprintf(path, MAXPGPATH,
1129  "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
1130  xlrec->mapped_db, xlrec->mapped_rel,
1131  (uint32) (xlrec->start_lsn >> 32),
1132  (uint32) xlrec->start_lsn,
1133  xlrec->mapped_xid, XLogRecGetXid(r));
1134 
1135  fd = OpenTransientFile(path,
1136  O_CREAT | O_WRONLY | PG_BINARY,
1137  S_IRUSR | S_IWUSR);
1138  if (fd < 0)
1139  ereport(ERROR,
1141  errmsg("could not create file \"%s\": %m", path)));
1142 
1143  /*
1144  * Truncate all data that's not guaranteed to have been safely fsynced (by
1145  * previous record or by the last checkpoint).
1146  */
1148  if (ftruncate(fd, xlrec->offset) != 0)
1149  ereport(ERROR,
1151  errmsg("could not truncate file \"%s\" to %u: %m",
1152  path, (uint32) xlrec->offset)));
1154 
1155  /* now seek to the position we want to write our data to */
1156  if (lseek(fd, xlrec->offset, SEEK_SET) != xlrec->offset)
1157  ereport(ERROR,
1159  errmsg("could not seek to end of file \"%s\": %m",
1160  path)));
1161 
1162  data = XLogRecGetData(r) + sizeof(*xlrec);
1163 
1164  len = xlrec->num_mappings * sizeof(LogicalRewriteMappingData);
1165 
1166  /* write out tail end of mapping file (again) */
1168  if (write(fd, data, len) != len)
1169  ereport(ERROR,
1171  errmsg("could not write to file \"%s\": %m", path)));
1173 
1174  /*
1175  * Now fsync all previously written data. We could improve things and only
1176  * do this for the last write to a file, but the required bookkeeping
1177  * doesn't seem worth the trouble.
1178  */
1180  if (pg_fsync(fd) != 0)
1181  ereport(ERROR,
1183  errmsg("could not fsync file \"%s\": %m", path)));
1185 
1186  CloseTransientFile(fd);
1187 }
#define write(a, b, c)
Definition: win32.h:14
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1038
#define XLogRecGetData(decoder)
Definition: xlogreader.h:220
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2144
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:268
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1232
#define ereport(elevel, rest)
Definition: elog.h:122
int CloseTransientFile(int fd)
Definition: fd.c:2305
#define XLogRecGetXid(decoder)
Definition: xlogreader.h:218
TransactionId mapped_xid
Definition: heapam_xlog.h:362
#define ftruncate(a, b)
Definition: win32.h:59
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1208
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
int errmsg(const char *fmt,...)
Definition: elog.c:797
struct LogicalRewriteMappingData LogicalRewriteMappingData
int pg_fsync(int fd)
Definition: fd.c:333
static void logical_begin_heap_rewrite ( RewriteState  state)
static

Definition at line 799 of file rewriteheap.c.

References HASHCTL::entrysize, GetXLogInsertRecPtr(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, HASHCTL::keysize, NULL, ProcArrayGetReplicationSlotXmin(), RelationIsAccessibleInLogicalDecoding, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_cxt, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_logical_xmin, RewriteStateData::rs_num_rewrite_mappings, and RewriteStateData::rs_old_rel.

Referenced by begin_heap_rewrite().

800 {
801  HASHCTL hash_ctl;
802  TransactionId logical_xmin;
803 
804  /*
805  * We only need to persist these mappings if the rewritten table can be
806  * accessed during logical decoding, if not, we can skip doing any
807  * additional work.
808  */
809  state->rs_logical_rewrite =
811 
812  if (!state->rs_logical_rewrite)
813  return;
814 
815  ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin);
816 
817  /*
818  * If there are no logical slots in progress we don't need to do anything,
819  * there cannot be any remappings for relevant rows yet. The relation's
820  * lock protects us against races.
821  */
822  if (logical_xmin == InvalidTransactionId)
823  {
824  state->rs_logical_rewrite = false;
825  return;
826  }
827 
828  state->rs_logical_xmin = logical_xmin;
830  state->rs_num_rewrite_mappings = 0;
831 
832  memset(&hash_ctl, 0, sizeof(hash_ctl));
833  hash_ctl.keysize = sizeof(TransactionId);
834  hash_ctl.entrysize = sizeof(RewriteMappingFile);
835  hash_ctl.hcxt = state->rs_cxt;
836 
837  state->rs_logical_mappings =
838  hash_create("Logical rewrite mapping",
839  128, /* arbitrary initial size */
840  &hash_ctl,
842 }
TransactionId rs_logical_xmin
Definition: rewriteheap.c:154
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
uint32 TransactionId
Definition: c.h:397
MemoryContext hcxt
Definition: hsearch.h:78
Size entrysize
Definition: hsearch.h:73
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:2985
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:164
XLogRecPtr GetXLogInsertRecPtr(void)
Definition: xlog.c:11107
#define InvalidTransactionId
Definition: transam.h:31
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:160
HTAB * rs_logical_mappings
Definition: rewriteheap.c:163
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext rs_cxt
Definition: rewriteheap.c:158
#define RelationIsAccessibleInLogicalDecoding(relation)
Definition: rel.h:559
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:301
Size keysize
Definition: hsearch.h:72
#define NULL
Definition: c.h:229
struct RewriteMappingFile RewriteMappingFile
Relation rs_old_rel
Definition: rewriteheap.c:143
static void logical_end_heap_rewrite ( RewriteState  state)
static

Definition at line 946 of file rewriteheap.c.

References ereport, errcode_for_file_access(), errmsg(), ERROR, FileClose(), FileSync(), hash_seq_init(), hash_seq_search(), logical_heap_rewrite_flush_mappings(), NULL, RewriteMappingFile::path, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_num_rewrite_mappings, RewriteMappingFile::vfd, and WAIT_EVENT_LOGICAL_REWRITE_SYNC.

Referenced by end_heap_rewrite().

947 {
948  HASH_SEQ_STATUS seq_status;
949  RewriteMappingFile *src;
950 
951  /* done, no logical rewrite in progress */
952  if (!state->rs_logical_rewrite)
953  return;
954 
955  /* writeout remaining in-memory entries */
956  if (state->rs_num_rewrite_mappings > 0)
958 
959  /* Iterate over all mappings we have written and fsync the files. */
960  hash_seq_init(&seq_status, state->rs_logical_mappings);
961  while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
962  {
964  ereport(ERROR,
966  errmsg("could not fsync file \"%s\": %m", src->path)));
967  FileClose(src->vfd);
968  }
969  /* memory context cleanup will deal with the rest */
970 }
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:164
#define ERROR
Definition: elog.h:43
char path[MAXPGPATH]
Definition: rewriteheap.c:210
int FileSync(File file, uint32 wait_event_info)
Definition: fd.c:1830
int errcode_for_file_access(void)
Definition: elog.c:598
#define ereport(elevel, rest)
Definition: elog.h:122
HTAB * rs_logical_mappings
Definition: rewriteheap.c:163
void FileClose(File file)
Definition: fd.c:1493
#define NULL
Definition: c.h:229
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1351
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1341
static void logical_heap_rewrite_flush_mappings(RewriteState state)
Definition: rewriteheap.c:848
int errmsg(const char *fmt,...)
Definition: elog.c:797
static void logical_heap_rewrite_flush_mappings ( RewriteState  state)
static

Definition at line 848 of file rewriteheap.c.

References Assert, dlist_mutable_iter::cur, DEBUG1, dlist_container, dlist_delete(), dlist_foreach_modify, elog, ereport, errcode_for_file_access(), errmsg(), ERROR, FileWrite(), hash_seq_init(), hash_seq_search(), InvalidOid, RewriteMappingDataEntry::map, xl_heap_rewrite_mapping::mapped_db, xl_heap_rewrite_mapping::mapped_rel, xl_heap_rewrite_mapping::mapped_xid, RewriteMappingFile::mappings, MyDatabaseId, RewriteMappingDataEntry::node, NULL, RewriteMappingFile::num_mappings, xl_heap_rewrite_mapping::num_mappings, RewriteMappingFile::off, xl_heap_rewrite_mapping::offset, palloc(), RewriteMappingFile::path, pfree(), RelationData::rd_rel, RelationGetRelid, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_num_rewrite_mappings, RewriteStateData::rs_old_rel, xl_heap_rewrite_mapping::start_lsn, RewriteMappingFile::vfd, WAIT_EVENT_LOGICAL_REWRITE_WRITE, RewriteMappingFile::xid, XLOG_HEAP2_REWRITE, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by logical_end_heap_rewrite(), and logical_rewrite_log_mapping().

849 {
850  HASH_SEQ_STATUS seq_status;
851  RewriteMappingFile *src;
852  dlist_mutable_iter iter;
853 
854  Assert(state->rs_logical_rewrite);
855 
856  /* no logical rewrite in progress, no need to iterate over mappings */
857  if (state->rs_num_rewrite_mappings == 0)
858  return;
859 
860  elog(DEBUG1, "flushing %u logical rewrite mapping entries",
861  state->rs_num_rewrite_mappings);
862 
863  hash_seq_init(&seq_status, state->rs_logical_mappings);
864  while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
865  {
866  char *waldata;
867  char *waldata_start;
869  Oid dboid;
870  uint32 len;
871  int written;
872 
873  /* this file hasn't got any new mappings */
874  if (src->num_mappings == 0)
875  continue;
876 
877  if (state->rs_old_rel->rd_rel->relisshared)
878  dboid = InvalidOid;
879  else
880  dboid = MyDatabaseId;
881 
882  xlrec.num_mappings = src->num_mappings;
883  xlrec.mapped_rel = RelationGetRelid(state->rs_old_rel);
884  xlrec.mapped_xid = src->xid;
885  xlrec.mapped_db = dboid;
886  xlrec.offset = src->off;
887  xlrec.start_lsn = state->rs_begin_lsn;
888 
889  /* write all mappings consecutively */
890  len = src->num_mappings * sizeof(LogicalRewriteMappingData);
891  waldata_start = waldata = palloc(len);
892 
893  /*
894  * collect data we need to write out, but don't modify ondisk data yet
895  */
896  dlist_foreach_modify(iter, &src->mappings)
897  {
899 
900  pmap = dlist_container(RewriteMappingDataEntry, node, iter.cur);
901 
902  memcpy(waldata, &pmap->map, sizeof(pmap->map));
903  waldata += sizeof(pmap->map);
904 
905  /* remove from the list and free */
906  dlist_delete(&pmap->node);
907  pfree(pmap);
908 
909  /* update bookkeeping */
910  state->rs_num_rewrite_mappings--;
911  src->num_mappings--;
912  }
913 
914  Assert(src->num_mappings == 0);
915  Assert(waldata == waldata_start + len);
916 
917  /*
918  * Note that we deviate from the usual WAL coding practices here,
919  * check the above "Logical rewrite support" comment for reasoning.
920  */
921  written = FileWrite(src->vfd, waldata_start, len,
923  if (written != len)
924  ereport(ERROR,
926  errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path,
927  written, len)));
928  src->off += len;
929 
930  XLogBeginInsert();
931  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
932  XLogRegisterData(waldata_start, len);
933 
934  /* write xlog record */
935  XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_REWRITE);
936 
937  pfree(waldata_start);
938  }
939  Assert(state->rs_num_rewrite_mappings == 0);
940 }
#define DEBUG1
Definition: elog.h:25
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define XLOG_HEAP2_REWRITE
Definition: heapam_xlog.h:53
Form_pg_class rd_rel
Definition: rel.h:114
unsigned int Oid
Definition: postgres_ext.h:31
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:950
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:164
#define ERROR
Definition: elog.h:43
char path[MAXPGPATH]
Definition: rewriteheap.c:210
TransactionId xid
Definition: rewriteheap.c:205
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:268
int FileWrite(File file, char *buffer, int amount, uint32 wait_event_info)
Definition: fd.c:1709
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define ereport(elevel, rest)
Definition: elog.h:122
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:160
HTAB * rs_logical_mappings
Definition: rewriteheap.c:163
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:323
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:415
Oid MyDatabaseId
Definition: globals.c:77
#define InvalidOid
Definition: postgres_ext.h:36
TransactionId mapped_xid
Definition: heapam_xlog.h:362
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
dlist_head mappings
Definition: rewriteheap.c:209
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1351
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1341
void * palloc(Size size)
Definition: mcxt.c:849
int errmsg(const char *fmt,...)
Definition: elog.c:797
LogicalRewriteMappingData map
Definition: rewriteheap.c:219
struct LogicalRewriteMappingData LogicalRewriteMappingData
Relation rs_old_rel
Definition: rewriteheap.c:143
#define elog
Definition: elog.h:219
void XLogBeginInsert(void)
Definition: xloginsert.c:120
#define RelationGetRelid(relation)
Definition: rel.h:416
static void logical_rewrite_heap_tuple ( RewriteState  state,
ItemPointerData  old_tid,
HeapTuple  new_tuple 
)
static

Definition at line 1044 of file rewriteheap.c.

References HEAP_XMAX_IS_LOCKED_ONLY, HeapTupleHeaderGetUpdateXid, HeapTupleHeaderGetXmin, logical_rewrite_log_mapping(), LogicalRewriteMappingData::new_node, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_node, LogicalRewriteMappingData::old_tid, RelationData::rd_node, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_logical_xmin, RewriteStateData::rs_new_rel, RewriteStateData::rs_old_rel, HeapTupleData::t_data, HeapTupleHeaderData::t_infomask, HeapTupleData::t_self, TransactionIdEquals, TransactionIdIsNormal, and TransactionIdPrecedes().

Referenced by rewrite_heap_tuple().

1046 {
1047  ItemPointerData new_tid = new_tuple->t_self;
1048  TransactionId cutoff = state->rs_logical_xmin;
1049  TransactionId xmin;
1050  TransactionId xmax;
1051  bool do_log_xmin = false;
1052  bool do_log_xmax = false;
1054 
1055  /* no logical rewrite in progress, we don't need to log anything */
1056  if (!state->rs_logical_rewrite)
1057  return;
1058 
1059  xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
1060  /* use *GetUpdateXid to correctly deal with multixacts */
1061  xmax = HeapTupleHeaderGetUpdateXid(new_tuple->t_data);
1062 
1063  /*
1064  * Log the mapping iff the tuple has been created recently.
1065  */
1066  if (TransactionIdIsNormal(xmin) && !TransactionIdPrecedes(xmin, cutoff))
1067  do_log_xmin = true;
1068 
1069  if (!TransactionIdIsNormal(xmax))
1070  {
1071  /*
1072  * no xmax is set, can't have any permanent ones, so this check is
1073  * sufficient
1074  */
1075  }
1076  else if (HEAP_XMAX_IS_LOCKED_ONLY(new_tuple->t_data->t_infomask))
1077  {
1078  /* only locked, we don't care */
1079  }
1080  else if (!TransactionIdPrecedes(xmax, cutoff))
1081  {
1082  /* tuple has been deleted recently, log */
1083  do_log_xmax = true;
1084  }
1085 
1086  /* if neither needs to be logged, we're done */
1087  if (!do_log_xmin && !do_log_xmax)
1088  return;
1089 
1090  /* fill out mapping information */
1091  map.old_node = state->rs_old_rel->rd_node;
1092  map.old_tid = old_tid;
1093  map.new_node = state->rs_new_rel->rd_node;
1094  map.new_tid = new_tid;
1095 
1096  /* ---
1097  * Now persist the mapping for the individual xids that are affected. We
1098  * need to log for both xmin and xmax if they aren't the same transaction
1099  * since the mapping files are per "affected" xid.
1100  * We don't muster all that much effort detecting whether xmin and xmax
1101  * are actually the same transaction, we just check whether the xid is the
1102  * same disregarding subtransactions. Logging too much is relatively
1103  * harmless and we could never do the check fully since subtransaction
1104  * data is thrown away during restarts.
1105  * ---
1106  */
1107  if (do_log_xmin)
1108  logical_rewrite_log_mapping(state, xmin, &map);
1109  /* separately log mapping for xmax unless it'd be redundant */
1110  if (do_log_xmax && !TransactionIdEquals(xmin, xmax))
1111  logical_rewrite_log_mapping(state, xmax, &map);
1112 }
#define HeapTupleHeaderGetUpdateXid(tup)
Definition: htup_details.h:359
TransactionId rs_logical_xmin
Definition: rewriteheap.c:154
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
uint32 TransactionId
Definition: c.h:397
Relation rs_new_rel
Definition: rewriteheap.c:144
HeapTupleHeader t_data
Definition: htup.h:67
ItemPointerData t_self
Definition: htup.h:65
static void logical_rewrite_log_mapping(RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
Definition: rewriteheap.c:976
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define HEAP_XMAX_IS_LOCKED_ONLY(infomask)
Definition: htup_details.h:216
RelFileNode rd_node
Definition: rel.h:85
ItemPointerData new_tid
Definition: rewriteheap.h:40
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:307
Relation rs_old_rel
Definition: rewriteheap.c:143
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
ItemPointerData old_tid
Definition: rewriteheap.h:39
static void logical_rewrite_log_mapping ( RewriteState  state,
TransactionId  xid,
LogicalRewriteMappingData map 
)
static

Definition at line 976 of file rewriteheap.c.

References dlist_init(), dlist_push_tail(), ereport, errcode_for_file_access(), errmsg(), ERROR, GetCurrentTransactionId(), HASH_ENTER, hash_search(), InvalidOid, logical_heap_rewrite_flush_mappings(), LOGICAL_REWRITE_FORMAT, RewriteMappingDataEntry::map, RewriteMappingFile::mappings, MAXPGPATH, MemoryContextAlloc(), MyDatabaseId, RewriteMappingDataEntry::node, RewriteMappingFile::num_mappings, RewriteMappingFile::off, RewriteMappingFile::path, PathNameOpenFile(), PG_BINARY, RelationData::rd_rel, RelationGetRelid, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_cxt, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_num_rewrite_mappings, RewriteStateData::rs_old_rel, snprintf(), and RewriteMappingFile::vfd.

Referenced by logical_rewrite_heap_tuple().

978 {
979  RewriteMappingFile *src;
981  Oid relid;
982  bool found;
983 
984  relid = RelationGetRelid(state->rs_old_rel);
985 
986  /* look for existing mappings for this 'mapped' xid */
987  src = hash_search(state->rs_logical_mappings, &xid,
988  HASH_ENTER, &found);
989 
990  /*
991  * We haven't yet had the need to map anything for this xid, create
992  * per-xid data structures.
993  */
994  if (!found)
995  {
996  char path[MAXPGPATH];
997  Oid dboid;
998 
999  if (state->rs_old_rel->rd_rel->relisshared)
1000  dboid = InvalidOid;
1001  else
1002  dboid = MyDatabaseId;
1003 
1004  snprintf(path, MAXPGPATH,
1005  "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
1006  dboid, relid,
1007  (uint32) (state->rs_begin_lsn >> 32),
1008  (uint32) state->rs_begin_lsn,
1009  xid, GetCurrentTransactionId());
1010 
1011  dlist_init(&src->mappings);
1012  src->num_mappings = 0;
1013  src->off = 0;
1014  memcpy(src->path, path, sizeof(path));
1015  src->vfd = PathNameOpenFile(path,
1016  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
1017  S_IRUSR | S_IWUSR);
1018  if (src->vfd < 0)
1019  ereport(ERROR,
1021  errmsg("could not create file \"%s\": %m", path)));
1022  }
1023 
1024  pmap = MemoryContextAlloc(state->rs_cxt,
1025  sizeof(RewriteMappingDataEntry));
1026  memcpy(&pmap->map, map, sizeof(LogicalRewriteMappingData));
1027  dlist_push_tail(&src->mappings, &pmap->node);
1028  src->num_mappings++;
1029  state->rs_num_rewrite_mappings++;
1030 
1031  /*
1032  * Write out buffer every time we've too many in-memory entries across all
1033  * mapping files.
1034  */
1035  if (state->rs_num_rewrite_mappings >= 1000 /* arbitrary number */ )
1037 }
File PathNameOpenFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:1303
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:885
Form_pg_class rd_rel
Definition: rel.h:114
unsigned int Oid
Definition: postgres_ext.h:31
#define PG_BINARY
Definition: c.h:1038
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:164
#define ERROR
Definition: elog.h:43
char path[MAXPGPATH]
Definition: rewriteheap.c:210
#define MAXPGPATH
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:417
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:268
#define ereport(elevel, rest)
Definition: elog.h:122
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:160
HTAB * rs_logical_mappings
Definition: rewriteheap.c:163
MemoryContext rs_cxt
Definition: rewriteheap.c:158
Oid MyDatabaseId
Definition: globals.c:77
#define InvalidOid
Definition: postgres_ext.h:36
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
dlist_head mappings
Definition: rewriteheap.c:209
static void logical_heap_rewrite_flush_mappings(RewriteState state)
Definition: rewriteheap.c:848
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
int errmsg(const char *fmt,...)
Definition: elog.c:797
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:707
LogicalRewriteMappingData map
Definition: rewriteheap.c:219
Relation rs_old_rel
Definition: rewriteheap.c:143
#define RelationGetRelid(relation)
Definition: rel.h:416
static void raw_heap_insert ( RewriteState  state,
HeapTuple  tup 
)
static

Definition at line 628 of file rewriteheap.c.

References Assert, elog, ereport, errcode(), errmsg(), ERROR, HEAP_DEFAULT_FILLFACTOR, heap_freetuple(), HEAP_INSERT_SKIP_FSM, HEAP_INSERT_SKIP_WAL, HeapTupleHasExternal, InvalidOffsetNumber, ItemPointerIsValid, ItemPointerSet, log_newpage(), MAIN_FORKNUM, MAXALIGN, MaxHeapTupleSize, NULL, PageAddItem, PageGetHeapFreeSpace(), PageGetItem, PageGetItemId, PageInit(), PageSetChecksumInplace(), RelationData::rd_node, RelationData::rd_rel, RelationData::rd_smgr, RelationGetTargetPageFreeSpace, RelationOpenSmgr, RELKIND_TOASTVALUE, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_new_rel, RewriteStateData::rs_use_wal, smgrextend(), HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, HeapTupleData::t_len, HeapTupleData::t_self, toast_insert_or_update(), and TOAST_TUPLE_THRESHOLD.

Referenced by end_heap_rewrite(), and rewrite_heap_tuple().

629 {
630  Page page = state->rs_buffer;
631  Size pageFreeSpace,
632  saveFreeSpace;
633  Size len;
634  OffsetNumber newoff;
635  HeapTuple heaptup;
636 
637  /*
638  * If the new tuple is too big for storage or contains already toasted
639  * out-of-line attributes from some other relation, invoke the toaster.
640  *
641  * Note: below this point, heaptup is the data we actually intend to store
642  * into the relation; tup is the caller's original untoasted data.
643  */
644  if (state->rs_new_rel->rd_rel->relkind == RELKIND_TOASTVALUE)
645  {
646  /* toast table entries should never be recursively toasted */
648  heaptup = tup;
649  }
650  else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
651  heaptup = toast_insert_or_update(state->rs_new_rel, tup, NULL,
653  (state->rs_use_wal ?
654  0 : HEAP_INSERT_SKIP_WAL));
655  else
656  heaptup = tup;
657 
658  len = MAXALIGN(heaptup->t_len); /* be conservative */
659 
660  /*
661  * If we're gonna fail for oversize tuple, do it right away
662  */
663  if (len > MaxHeapTupleSize)
664  ereport(ERROR,
665  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
666  errmsg("row is too big: size %zu, maximum size %zu",
667  len, MaxHeapTupleSize)));
668 
669  /* Compute desired extra freespace due to fillfactor option */
670  saveFreeSpace = RelationGetTargetPageFreeSpace(state->rs_new_rel,
672 
673  /* Now we can check to see if there's enough free space already. */
674  if (state->rs_buffer_valid)
675  {
676  pageFreeSpace = PageGetHeapFreeSpace(page);
677 
678  if (len + saveFreeSpace > pageFreeSpace)
679  {
680  /* Doesn't fit, so write out the existing page */
681 
682  /* XLOG stuff */
683  if (state->rs_use_wal)
684  log_newpage(&state->rs_new_rel->rd_node,
685  MAIN_FORKNUM,
686  state->rs_blockno,
687  page,
688  true);
689 
690  /*
691  * Now write the page. We say isTemp = true even if it's not a
692  * temp table, because there's no need for smgr to schedule an
693  * fsync for this write; we'll do it ourselves in
694  * end_heap_rewrite.
695  */
697 
698  PageSetChecksumInplace(page, state->rs_blockno);
699 
701  state->rs_blockno, (char *) page, true);
702 
703  state->rs_blockno++;
704  state->rs_buffer_valid = false;
705  }
706  }
707 
708  if (!state->rs_buffer_valid)
709  {
710  /* Initialize a new empty page */
711  PageInit(page, BLCKSZ, 0);
712  state->rs_buffer_valid = true;
713  }
714 
715  /* And now we can insert the tuple into the page */
716  newoff = PageAddItem(page, (Item) heaptup->t_data, heaptup->t_len,
717  InvalidOffsetNumber, false, true);
718  if (newoff == InvalidOffsetNumber)
719  elog(ERROR, "failed to add tuple");
720 
721  /* Update caller's t_self to the actual position where it was stored */
722  ItemPointerSet(&(tup->t_self), state->rs_blockno, newoff);
723 
724  /*
725  * Insert the correct position into CTID of the stored tuple, too, if the
726  * caller didn't supply a valid CTID.
727  */
728  if (!ItemPointerIsValid(&tup->t_data->t_ctid))
729  {
730  ItemId newitemid;
731  HeapTupleHeader onpage_tup;
732 
733  newitemid = PageGetItemId(page, newoff);
734  onpage_tup = (HeapTupleHeader) PageGetItem(page, newitemid);
735 
736  onpage_tup->t_ctid = tup->t_self;
737  }
738 
739  /* If heaptup is a private copy, release it. */
740  if (heaptup != tup)
741  heap_freetuple(heaptup);
742 }
#define ItemPointerIsValid(pointer)
Definition: itemptr.h:60
#define TOAST_TUPLE_THRESHOLD
Definition: tuptoaster.h:55
HeapTuple toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, int options)
Definition: tuptoaster.c:536
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
struct SMgrRelationData * rd_smgr
Definition: rel.h:87
Pointer Item
Definition: item.h:17
Relation rs_new_rel
Definition: rewriteheap.c:144
int errcode(int sqlerrcode)
Definition: elog.c:575
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:412
#define HEAP_INSERT_SKIP_WAL
Definition: heapam.h:28
Form_pg_class rd_rel
Definition: rel.h:114
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1372
uint16 OffsetNumber
Definition: off.h:24
HeapTupleHeader t_data
Definition: htup.h:67
#define RelationOpenSmgr(relation)
Definition: rel.h:460
#define ERROR
Definition: elog.h:43
Size PageGetHeapFreeSpace(Page page)
Definition: bufpage.c:666
ItemPointerData t_ctid
Definition: htup_details.h:150
ItemPointerData t_self
Definition: htup.h:65
uint32 t_len
Definition: htup.h:64
#define MaxHeapTupleSize
Definition: htup_details.h:561
#define ereport(elevel, rest)
Definition: elog.h:122
#define RelationGetTargetPageFreeSpace(relation, defaultff)
Definition: rel.h:307
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:231
#define RELKIND_TOASTVALUE
Definition: pg_class.h:163
#define InvalidOffsetNumber
Definition: off.h:26
RelFileNode rd_node
Definition: rel.h:85
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
size_t Size
Definition: c.h:356
void PageSetChecksumInplace(Page page, BlockNumber blkno)
Definition: bufpage.c:1199
#define MAXALIGN(LEN)
Definition: c.h:588
void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync)
Definition: smgr.c:600
#define HEAP_INSERT_SKIP_FSM
Definition: heapam.h:29
#define HeapTupleHasExternal(tuple)
Definition: htup_details.h:674
int errmsg(const char *fmt,...)
Definition: elog.c:797
XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, Page page, bool page_std)
Definition: xloginsert.c:972
#define elog
Definition: elog.h:219
#define HEAP_DEFAULT_FILLFACTOR
Definition: rel.h:286
BlockNumber rs_blockno
Definition: rewriteheap.c:146
#define PageGetItem(page, itemId)
Definition: bufpage.h:336
Pointer Page
Definition: bufpage.h:74
#define ItemPointerSet(pointer, blockNumber, offNum)
Definition: itemptr.h:105
void PageInit(Page page, Size pageSize, Size specialSize)
Definition: bufpage.c:41
bool rewrite_heap_dead_tuple ( RewriteState  state,
HeapTuple  old_tuple 
)

Definition at line 578 of file rewriteheap.c.

References Assert, HASH_FIND, HASH_REMOVE, hash_search(), heap_freetuple(), HeapTupleHeaderGetXmin, NULL, RewriteStateData::rs_unresolved_tups, HeapTupleData::t_data, HeapTupleData::t_self, TidHashKey::tid, UnresolvedTupData::tuple, and TidHashKey::xmin.

Referenced by copy_heap_data().

579 {
580  /*
581  * If we have already seen an earlier tuple in the update chain that
582  * points to this tuple, let's forget about that earlier tuple. It's in
583  * fact dead as well, our simple xmax < OldestXmin test in
584  * HeapTupleSatisfiesVacuum just wasn't enough to detect it. It happens
585  * when xmin of a tuple is greater than xmax, which sounds
586  * counter-intuitive but is perfectly valid.
587  *
588  * We don't bother to try to detect the situation the other way round,
589  * when we encounter the dead tuple first and then the recently dead one
590  * that points to it. If that happens, we'll have some unmatched entries
591  * in the UnresolvedTups hash table at the end. That can happen anyway,
592  * because a vacuum might have removed the dead tuple in the chain before
593  * us.
594  */
595  UnresolvedTup unresolved;
596  TidHashKey hashkey;
597  bool found;
598 
599  memset(&hashkey, 0, sizeof(hashkey));
600  hashkey.xmin = HeapTupleHeaderGetXmin(old_tuple->t_data);
601  hashkey.tid = old_tuple->t_self;
602 
603  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
604  HASH_FIND, NULL);
605 
606  if (unresolved != NULL)
607  {
608  /* Need to free the contained tuple as well as the hashtable entry */
609  heap_freetuple(unresolved->tuple);
610  hash_search(state->rs_unresolved_tups, &hashkey,
611  HASH_REMOVE, &found);
612  Assert(found);
613  return true;
614  }
615 
616  return false;
617 }
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:885
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1372
HeapTupleHeader t_data
Definition: htup.h:67
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:161
ItemPointerData t_self
Definition: htup.h:65
ItemPointerData tid
Definition: rewriteheap.c:176
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:307
TransactionId xmin
Definition: rewriteheap.c:175
void rewrite_heap_tuple ( RewriteState  state,
HeapTuple  old_tuple,
HeapTuple  new_tuple 
)

Definition at line 380 of file rewriteheap.c.

References Assert, HASH_ENTER, HASH_FIND, HASH_REMOVE, hash_search(), HEAP2_XACT_MASK, heap_copytuple(), heap_freetuple(), heap_freeze_tuple(), HEAP_UPDATED, HEAP_XACT_MASK, HEAP_XMAX_INVALID, HeapTupleHeaderGetUpdateXid, HeapTupleHeaderGetXmin, HeapTupleHeaderIsOnlyLocked(), ItemPointerEquals(), ItemPointerSetInvalid, logical_rewrite_heap_tuple(), MemoryContextSwitchTo(), OldToNewMappingData::new_tid, NULL, UnresolvedTupData::old_tid, raw_heap_insert(), RewriteStateData::rs_cutoff_multi, RewriteStateData::rs_cxt, RewriteStateData::rs_freeze_xid, RewriteStateData::rs_old_new_tid_map, RewriteStateData::rs_oldest_xmin, RewriteStateData::rs_unresolved_tups, HeapTupleHeaderData::t_choice, HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, HeapTupleHeaderData::t_heap, HeapTupleHeaderData::t_infomask, HeapTupleHeaderData::t_infomask2, HeapTupleData::t_self, TidHashKey::tid, TransactionIdPrecedes(), UnresolvedTupData::tuple, and TidHashKey::xmin.

Referenced by reform_and_rewrite_tuple().

382 {
383  MemoryContext old_cxt;
384  ItemPointerData old_tid;
385  TidHashKey hashkey;
386  bool found;
387  bool free_new;
388 
389  old_cxt = MemoryContextSwitchTo(state->rs_cxt);
390 
391  /*
392  * Copy the original tuple's visibility information into new_tuple.
393  *
394  * XXX we might later need to copy some t_infomask2 bits, too? Right now,
395  * we intentionally clear the HOT status bits.
396  */
397  memcpy(&new_tuple->t_data->t_choice.t_heap,
398  &old_tuple->t_data->t_choice.t_heap,
399  sizeof(HeapTupleFields));
400 
401  new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
402  new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
403  new_tuple->t_data->t_infomask |=
404  old_tuple->t_data->t_infomask & HEAP_XACT_MASK;
405 
406  /*
407  * While we have our hands on the tuple, we may as well freeze any
408  * eligible xmin or xmax, so that future VACUUM effort can be saved.
409  */
410  heap_freeze_tuple(new_tuple->t_data, state->rs_freeze_xid,
411  state->rs_cutoff_multi);
412 
413  /*
414  * Invalid ctid means that ctid should point to the tuple itself. We'll
415  * override it later if the tuple is part of an update chain.
416  */
417  ItemPointerSetInvalid(&new_tuple->t_data->t_ctid);
418 
419  /*
420  * If the tuple has been updated, check the old-to-new mapping hash table.
421  */
422  if (!((old_tuple->t_data->t_infomask & HEAP_XMAX_INVALID) ||
423  HeapTupleHeaderIsOnlyLocked(old_tuple->t_data)) &&
424  !(ItemPointerEquals(&(old_tuple->t_self),
425  &(old_tuple->t_data->t_ctid))))
426  {
427  OldToNewMapping mapping;
428 
429  memset(&hashkey, 0, sizeof(hashkey));
430  hashkey.xmin = HeapTupleHeaderGetUpdateXid(old_tuple->t_data);
431  hashkey.tid = old_tuple->t_data->t_ctid;
432 
433  mapping = (OldToNewMapping)
434  hash_search(state->rs_old_new_tid_map, &hashkey,
435  HASH_FIND, NULL);
436 
437  if (mapping != NULL)
438  {
439  /*
440  * We've already copied the tuple that t_ctid points to, so we can
441  * set the ctid of this tuple to point to the new location, and
442  * insert it right away.
443  */
444  new_tuple->t_data->t_ctid = mapping->new_tid;
445 
446  /* We don't need the mapping entry anymore */
447  hash_search(state->rs_old_new_tid_map, &hashkey,
448  HASH_REMOVE, &found);
449  Assert(found);
450  }
451  else
452  {
453  /*
454  * We haven't seen the tuple t_ctid points to yet. Stash this
455  * tuple into unresolved_tups to be written later.
456  */
457  UnresolvedTup unresolved;
458 
459  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
460  HASH_ENTER, &found);
461  Assert(!found);
462 
463  unresolved->old_tid = old_tuple->t_self;
464  unresolved->tuple = heap_copytuple(new_tuple);
465 
466  /*
467  * We can't do anything more now, since we don't know where the
468  * tuple will be written.
469  */
470  MemoryContextSwitchTo(old_cxt);
471  return;
472  }
473  }
474 
475  /*
476  * Now we will write the tuple, and then check to see if it is the B tuple
477  * in any new or known pair. When we resolve a known pair, we will be
478  * able to write that pair's A tuple, and then we have to check if it
479  * resolves some other pair. Hence, we need a loop here.
480  */
481  old_tid = old_tuple->t_self;
482  free_new = false;
483 
484  for (;;)
485  {
486  ItemPointerData new_tid;
487 
488  /* Insert the tuple and find out where it's put in new_heap */
489  raw_heap_insert(state, new_tuple);
490  new_tid = new_tuple->t_self;
491 
492  logical_rewrite_heap_tuple(state, old_tid, new_tuple);
493 
494  /*
495  * If the tuple is the updated version of a row, and the prior version
496  * wouldn't be DEAD yet, then we need to either resolve the prior
497  * version (if it's waiting in rs_unresolved_tups), or make an entry
498  * in rs_old_new_tid_map (so we can resolve it when we do see it). The
499  * previous tuple's xmax would equal this one's xmin, so it's
500  * RECENTLY_DEAD if and only if the xmin is not before OldestXmin.
501  */
502  if ((new_tuple->t_data->t_infomask & HEAP_UPDATED) &&
504  state->rs_oldest_xmin))
505  {
506  /*
507  * Okay, this is B in an update pair. See if we've seen A.
508  */
509  UnresolvedTup unresolved;
510 
511  memset(&hashkey, 0, sizeof(hashkey));
512  hashkey.xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
513  hashkey.tid = old_tid;
514 
515  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
516  HASH_FIND, NULL);
517 
518  if (unresolved != NULL)
519  {
520  /*
521  * We have seen and memorized the previous tuple already. Now
522  * that we know where we inserted the tuple its t_ctid points
523  * to, fix its t_ctid and insert it to the new heap.
524  */
525  if (free_new)
526  heap_freetuple(new_tuple);
527  new_tuple = unresolved->tuple;
528  free_new = true;
529  old_tid = unresolved->old_tid;
530  new_tuple->t_data->t_ctid = new_tid;
531 
532  /*
533  * We don't need the hash entry anymore, but don't free its
534  * tuple just yet.
535  */
536  hash_search(state->rs_unresolved_tups, &hashkey,
537  HASH_REMOVE, &found);
538  Assert(found);
539 
540  /* loop back to insert the previous tuple in the chain */
541  continue;
542  }
543  else
544  {
545  /*
546  * Remember the new tid of this tuple. We'll use it to set the
547  * ctid when we find the previous tuple in the chain.
548  */
549  OldToNewMapping mapping;
550 
551  mapping = hash_search(state->rs_old_new_tid_map, &hashkey,
552  HASH_ENTER, &found);
553  Assert(!found);
554 
555  mapping->new_tid = new_tid;
556  }
557  }
558 
559  /* Done with this (chain of) tuples, for now */
560  if (free_new)
561  heap_freetuple(new_tuple);
562  break;
563  }
564 
565  MemoryContextSwitchTo(old_cxt);
566 }
#define HeapTupleHeaderGetUpdateXid(tup)
Definition: htup_details.h:359
bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple)
Definition: tqual.c:1585
HeapTuple heap_copytuple(HeapTuple tuple)
Definition: heaptuple.c:608
union HeapTupleHeaderData::@45 t_choice
HeapTupleFields t_heap
Definition: htup_details.h:146
static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
Definition: rewriteheap.c:1044
TransactionId rs_freeze_xid
Definition: rewriteheap.c:152
MultiXactId rs_cutoff_multi
Definition: rewriteheap.c:156
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define HEAP2_XACT_MASK
Definition: htup_details.h:269
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:885
#define HEAP_UPDATED
Definition: htup_details.h:195
ItemPointerData old_tid
Definition: rewriteheap.c:185
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1372
bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId cutoff_xid, TransactionId cutoff_multi)
Definition: heapam.c:6769
HeapTupleHeader t_data
Definition: htup.h:67
ItemPointerData new_tid
Definition: rewriteheap.c:194
#define HEAP_XMAX_INVALID
Definition: htup_details.h:193
ItemPointerData t_ctid
Definition: htup_details.h:150
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:161
ItemPointerData t_self
Definition: htup.h:65
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
TransactionId rs_oldest_xmin
Definition: rewriteheap.c:150
MemoryContext rs_cxt
Definition: rewriteheap.c:158
ItemPointerData tid
Definition: rewriteheap.c:176
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
HTAB * rs_old_new_tid_map
Definition: rewriteheap.c:162
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:307
bool ItemPointerEquals(ItemPointer pointer1, ItemPointer pointer2)
Definition: itemptr.c:29
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:150
TransactionId xmin
Definition: rewriteheap.c:175
static void raw_heap_insert(RewriteState state, HeapTuple tup)
Definition: rewriteheap.c:628
#define HEAP_XACT_MASK
Definition: htup_details.h:204
OldToNewMappingData * OldToNewMapping
Definition: rewriteheap.c:197