PostgreSQL Source Code  git master
rewriteheap.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <unistd.h>
#include "miscadmin.h"
#include "access/heapam.h"
#include "access/heapam_xlog.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/tuptoaster.h"
#include "access/xact.h"
#include "access/xloginsert.h"
#include "catalog/catalog.h"
#include "lib/ilist.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/slot.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/smgr.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/tqual.h"
#include "storage/procarray.h"
Include dependency graph for rewriteheap.c:

Go to the source code of this file.

Data Structures

struct  RewriteStateData
 
struct  TidHashKey
 
struct  UnresolvedTupData
 
struct  OldToNewMappingData
 
struct  RewriteMappingFile
 
struct  RewriteMappingDataEntry
 

Typedefs

typedef struct RewriteStateData RewriteStateData
 
typedef UnresolvedTupDataUnresolvedTup
 
typedef OldToNewMappingDataOldToNewMapping
 
typedef struct RewriteMappingFile RewriteMappingFile
 
typedef struct RewriteMappingDataEntry RewriteMappingDataEntry
 

Functions

static void raw_heap_insert (RewriteState state, HeapTuple tup)
 
static void logical_begin_heap_rewrite (RewriteState state)
 
static void logical_rewrite_heap_tuple (RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
 
static void logical_end_heap_rewrite (RewriteState state)
 
RewriteState begin_heap_rewrite (Relation old_heap, Relation new_heap, TransactionId oldest_xmin, TransactionId freeze_xid, MultiXactId cutoff_multi, bool use_wal)
 
void end_heap_rewrite (RewriteState state)
 
void rewrite_heap_tuple (RewriteState state, HeapTuple old_tuple, HeapTuple new_tuple)
 
bool rewrite_heap_dead_tuple (RewriteState state, HeapTuple old_tuple)
 
static void logical_heap_rewrite_flush_mappings (RewriteState state)
 
static void logical_rewrite_log_mapping (RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
 
void heap_xlog_logical_rewrite (XLogReaderState *r)
 
void CheckPointLogicalRewriteHeap (void)
 

Typedef Documentation

◆ OldToNewMapping

◆ RewriteMappingDataEntry

◆ RewriteMappingFile

◆ RewriteStateData

◆ UnresolvedTup

Function Documentation

◆ begin_heap_rewrite()

RewriteState begin_heap_rewrite ( Relation  old_heap,
Relation  new_heap,
TransactionId  oldest_xmin,
TransactionId  freeze_xid,
MultiXactId  cutoff_multi,
bool  use_wal 
)

Definition at line 248 of file rewriteheap.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CurrentMemoryContext, HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, logical_begin_heap_rewrite(), MemoryContextSwitchTo(), palloc(), palloc0(), RelationGetNumberOfBlocks, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_cutoff_multi, RewriteStateData::rs_cxt, RewriteStateData::rs_freeze_xid, RewriteStateData::rs_new_rel, RewriteStateData::rs_old_new_tid_map, RewriteStateData::rs_old_rel, RewriteStateData::rs_oldest_xmin, RewriteStateData::rs_unresolved_tups, and RewriteStateData::rs_use_wal.

Referenced by copy_heap_data().

251 {
253  MemoryContext rw_cxt;
254  MemoryContext old_cxt;
255  HASHCTL hash_ctl;
256 
257  /*
258  * To ease cleanup, make a separate context that will contain the
259  * RewriteState struct itself plus all subsidiary data.
260  */
262  "Table rewrite",
264  old_cxt = MemoryContextSwitchTo(rw_cxt);
265 
266  /* Create and fill in the state struct */
267  state = palloc0(sizeof(RewriteStateData));
268 
269  state->rs_old_rel = old_heap;
270  state->rs_new_rel = new_heap;
271  state->rs_buffer = (Page) palloc(BLCKSZ);
272  /* new_heap needn't be empty, just locked */
273  state->rs_blockno = RelationGetNumberOfBlocks(new_heap);
274  state->rs_buffer_valid = false;
275  state->rs_use_wal = use_wal;
276  state->rs_oldest_xmin = oldest_xmin;
277  state->rs_freeze_xid = freeze_xid;
278  state->rs_cutoff_multi = cutoff_multi;
279  state->rs_cxt = rw_cxt;
280 
281  /* Initialize hash tables used to track update chains */
282  memset(&hash_ctl, 0, sizeof(hash_ctl));
283  hash_ctl.keysize = sizeof(TidHashKey);
284  hash_ctl.entrysize = sizeof(UnresolvedTupData);
285  hash_ctl.hcxt = state->rs_cxt;
286 
287  state->rs_unresolved_tups =
288  hash_create("Rewrite / Unresolved ctids",
289  128, /* arbitrary initial size */
290  &hash_ctl,
292 
293  hash_ctl.entrysize = sizeof(OldToNewMappingData);
294 
295  state->rs_old_new_tid_map =
296  hash_create("Rewrite / Old to new tid map",
297  128, /* arbitrary initial size */
298  &hash_ctl,
300 
301  MemoryContextSwitchTo(old_cxt);
302 
304 
305  return state;
306 }
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
TransactionId rs_freeze_xid
Definition: rewriteheap.c:152
MultiXactId rs_cutoff_multi
Definition: rewriteheap.c:156
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Size entrysize
Definition: hsearch.h:73
Relation rs_new_rel
Definition: rewriteheap.c:144
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:161
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define AllocSetContextCreate(parent, name, allocparams)
Definition: memutils.h:170
TransactionId rs_oldest_xmin
Definition: rewriteheap.c:150
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext rs_cxt
Definition: rewriteheap.c:158
void * palloc0(Size size)
Definition: mcxt.c:955
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
Size keysize
Definition: hsearch.h:72
#define RelationGetNumberOfBlocks(reln)
Definition: bufmgr.h:199
static void logical_begin_heap_rewrite(RewriteState state)
Definition: rewriteheap.c:803
HTAB * rs_old_new_tid_map
Definition: rewriteheap.c:162
Definition: regguts.h:298
void * palloc(Size size)
Definition: mcxt.c:924
Relation rs_old_rel
Definition: rewriteheap.c:143
BlockNumber rs_blockno
Definition: rewriteheap.c:146
Pointer Page
Definition: bufpage.h:74

◆ CheckPointLogicalRewriteHeap()

void CheckPointLogicalRewriteHeap ( void  )

Definition at line 1202 of file rewriteheap.c.

References AllocateDir(), CloseTransientFile(), dirent::d_name, DEBUG1, elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FreeDir(), GetRedoRecPtr(), InvalidXLogRecPtr, LOGICAL_REWRITE_FORMAT, lstat, MAXPGPATH, OpenTransientFile(), PG_BINARY, pg_fsync(), pgstat_report_wait_end(), pgstat_report_wait_start(), ReadDir(), ReplicationSlotsComputeLogicalRestartLSN(), S_ISREG, snprintf(), stat, and WAIT_EVENT_LOGICAL_REWRITE_CHECKPOINT_SYNC.

Referenced by CheckPointGuts().

1203 {
1204  XLogRecPtr cutoff;
1205  XLogRecPtr redo;
1206  DIR *mappings_dir;
1207  struct dirent *mapping_de;
1208  char path[MAXPGPATH + 20];
1209 
1210  /*
1211  * We start of with a minimum of the last redo pointer. No new decoding
1212  * slot will start before that, so that's a safe upper bound for removal.
1213  */
1214  redo = GetRedoRecPtr();
1215 
1216  /* now check for the restart ptrs from existing slots */
1218 
1219  /* don't start earlier than the restart lsn */
1220  if (cutoff != InvalidXLogRecPtr && redo < cutoff)
1221  cutoff = redo;
1222 
1223  mappings_dir = AllocateDir("pg_logical/mappings");
1224  while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
1225  {
1226  struct stat statbuf;
1227  Oid dboid;
1228  Oid relid;
1229  XLogRecPtr lsn;
1230  TransactionId rewrite_xid;
1231  TransactionId create_xid;
1232  uint32 hi,
1233  lo;
1234 
1235  if (strcmp(mapping_de->d_name, ".") == 0 ||
1236  strcmp(mapping_de->d_name, "..") == 0)
1237  continue;
1238 
1239  snprintf(path, sizeof(path), "pg_logical/mappings/%s", mapping_de->d_name);
1240  if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
1241  continue;
1242 
1243  /* Skip over files that cannot be ours. */
1244  if (strncmp(mapping_de->d_name, "map-", 4) != 0)
1245  continue;
1246 
1247  if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
1248  &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
1249  elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
1250 
1251  lsn = ((uint64) hi) << 32 | lo;
1252 
1253  if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
1254  {
1255  elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
1256  if (unlink(path) < 0)
1257  ereport(ERROR,
1259  errmsg("could not remove file \"%s\": %m", path)));
1260  }
1261  else
1262  {
1263  int fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1264 
1265  /*
1266  * The file cannot vanish due to concurrency since this function
1267  * is the only one removing logical mappings and it's run while
1268  * CheckpointLock is held exclusively.
1269  */
1270  if (fd < 0)
1271  ereport(ERROR,
1273  errmsg("could not open file \"%s\": %m", path)));
1274 
1275  /*
1276  * We could try to avoid fsyncing files that either haven't
1277  * changed or have only been created since the checkpoint's start,
1278  * but it's currently not deemed worth the effort.
1279  */
1281  if (pg_fsync(fd) != 0)
1282  ereport(ERROR,
1284  errmsg("could not fsync file \"%s\": %m", path)));
1286  CloseTransientFile(fd);
1287  }
1288  }
1289  FreeDir(mappings_dir);
1290 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define DEBUG1
Definition: elog.h:25
uint32 TransactionId
Definition: c.h:474
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
unsigned int Oid
Definition: postgres_ext.h:31
Definition: dirent.h:9
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1080
Definition: dirent.c:25
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2396
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:598
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition: slot.c:784
unsigned int uint32
Definition: c.h:325
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2600
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1260
#define ereport(elevel, rest)
Definition: elog.h:122
#define S_ISREG(m)
Definition: win32_port.h:310
int CloseTransientFile(int fd)
Definition: fd.c:2566
#define stat(a, b)
Definition: win32_port.h:266
uint64 XLogRecPtr
Definition: xlogdefs.h:21
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2666
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1236
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:8212
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
#define lstat(path, sb)
Definition: win32_port.h:255
int errmsg(const char *fmt,...)
Definition: elog.c:797
int pg_fsync(int fd)
Definition: fd.c:341
char d_name[MAX_PATH]
Definition: dirent.h:14
#define elog
Definition: elog.h:219
int FreeDir(DIR *dir)
Definition: fd.c:2718

◆ end_heap_rewrite()

void end_heap_rewrite ( RewriteState  state)

Definition at line 314 of file rewriteheap.c.

References hash_seq_init(), hash_seq_search(), heap_sync(), ItemPointerSetInvalid, log_newpage(), logical_end_heap_rewrite(), MAIN_FORKNUM, MemoryContextDelete(), PageSetChecksumInplace(), raw_heap_insert(), RelationData::rd_node, RelationData::rd_smgr, RelationNeedsWAL, RelationOpenSmgr, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_cxt, RewriteStateData::rs_new_rel, RewriteStateData::rs_unresolved_tups, RewriteStateData::rs_use_wal, smgrextend(), HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, and UnresolvedTupData::tuple.

Referenced by copy_heap_data().

315 {
316  HASH_SEQ_STATUS seq_status;
317  UnresolvedTup unresolved;
318 
319  /*
320  * Write any remaining tuples in the UnresolvedTups table. If we have any
321  * left, they should in fact be dead, but let's err on the safe side.
322  */
323  hash_seq_init(&seq_status, state->rs_unresolved_tups);
324 
325  while ((unresolved = hash_seq_search(&seq_status)) != NULL)
326  {
327  ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
328  raw_heap_insert(state, unresolved->tuple);
329  }
330 
331  /* Write the last page, if any */
332  if (state->rs_buffer_valid)
333  {
334  if (state->rs_use_wal)
335  log_newpage(&state->rs_new_rel->rd_node,
336  MAIN_FORKNUM,
337  state->rs_blockno,
338  state->rs_buffer,
339  true);
341 
343 
345  (char *) state->rs_buffer, true);
346  }
347 
348  /*
349  * If the rel is WAL-logged, must fsync before commit. We use heap_sync
350  * to ensure that the toast table gets fsync'd too.
351  *
352  * It's obvious that we must do this when not WAL-logging. It's less
353  * obvious that we have to do it even if we did WAL-log the pages. The
354  * reason is the same as in tablecmds.c's copy_relation_data(): we're
355  * writing data that's not in shared buffers, and so a CHECKPOINT
356  * occurring during the rewriteheap operation won't have fsync'd data we
357  * wrote before the checkpoint.
358  */
359  if (RelationNeedsWAL(state->rs_new_rel))
360  heap_sync(state->rs_new_rel);
361 
363 
364  /* Deleting the context frees everything */
365  MemoryContextDelete(state->rs_cxt);
366 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
static void logical_end_heap_rewrite(RewriteState state)
Definition: rewriteheap.c:950
struct SMgrRelationData * rd_smgr
Definition: rel.h:57
Relation rs_new_rel
Definition: rewriteheap.c:144
void heap_sync(Relation rel)
Definition: heapam.c:9356
HeapTupleHeader t_data
Definition: htup.h:68
#define RelationOpenSmgr(relation)
Definition: rel.h:465
ItemPointerData t_ctid
Definition: htup_details.h:157
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:161
MemoryContext rs_cxt
Definition: rewriteheap.c:158
RelFileNode rd_node
Definition: rel.h:55
void PageSetChecksumInplace(Page page, BlockNumber blkno)
Definition: bufpage.c:1195
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1389
#define RelationNeedsWAL(relation)
Definition: rel.h:510
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1379
void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync)
Definition: smgr.c:600
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:150
XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, Page page, bool page_std)
Definition: xloginsert.c:972
static void raw_heap_insert(RewriteState state, HeapTuple tup)
Definition: rewriteheap.c:632
BlockNumber rs_blockno
Definition: rewriteheap.c:146

◆ heap_xlog_logical_rewrite()

void heap_xlog_logical_rewrite ( XLogReaderState r)

Definition at line 1121 of file rewriteheap.c.

References CloseTransientFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ftruncate, LOGICAL_REWRITE_FORMAT, xl_heap_rewrite_mapping::mapped_db, xl_heap_rewrite_mapping::mapped_rel, xl_heap_rewrite_mapping::mapped_xid, MAXPGPATH, xl_heap_rewrite_mapping::num_mappings, xl_heap_rewrite_mapping::offset, OpenTransientFile(), PG_BINARY, pg_fsync(), pgstat_report_wait_end(), pgstat_report_wait_start(), snprintf(), xl_heap_rewrite_mapping::start_lsn, WAIT_EVENT_LOGICAL_REWRITE_MAPPING_SYNC, WAIT_EVENT_LOGICAL_REWRITE_MAPPING_WRITE, WAIT_EVENT_LOGICAL_REWRITE_TRUNCATE, write, XLogRecGetData, and XLogRecGetXid.

Referenced by heap2_redo().

1122 {
1123  char path[MAXPGPATH];
1124  int fd;
1125  xl_heap_rewrite_mapping *xlrec;
1126  uint32 len;
1127  char *data;
1128 
1129  xlrec = (xl_heap_rewrite_mapping *) XLogRecGetData(r);
1130 
1131  snprintf(path, MAXPGPATH,
1132  "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
1133  xlrec->mapped_db, xlrec->mapped_rel,
1134  (uint32) (xlrec->start_lsn >> 32),
1135  (uint32) xlrec->start_lsn,
1136  xlrec->mapped_xid, XLogRecGetXid(r));
1137 
1138  fd = OpenTransientFile(path,
1139  O_CREAT | O_WRONLY | PG_BINARY);
1140  if (fd < 0)
1141  ereport(ERROR,
1143  errmsg("could not create file \"%s\": %m", path)));
1144 
1145  /*
1146  * Truncate all data that's not guaranteed to have been safely fsynced (by
1147  * previous record or by the last checkpoint).
1148  */
1150  if (ftruncate(fd, xlrec->offset) != 0)
1151  ereport(ERROR,
1153  errmsg("could not truncate file \"%s\" to %u: %m",
1154  path, (uint32) xlrec->offset)));
1156 
1157  /* now seek to the position we want to write our data to */
1158  if (lseek(fd, xlrec->offset, SEEK_SET) != xlrec->offset)
1159  ereport(ERROR,
1161  errmsg("could not seek to end of file \"%s\": %m",
1162  path)));
1163 
1164  data = XLogRecGetData(r) + sizeof(*xlrec);
1165 
1166  len = xlrec->num_mappings * sizeof(LogicalRewriteMappingData);
1167 
1168  /* write out tail end of mapping file (again) */
1170  if (write(fd, data, len) != len)
1171  ereport(ERROR,
1173  errmsg("could not write to file \"%s\": %m", path)));
1175 
1176  /*
1177  * Now fsync all previously written data. We could improve things and only
1178  * do this for the last write to a file, but the required bookkeeping
1179  * doesn't seem worth the trouble.
1180  */
1182  if (pg_fsync(fd) != 0)
1183  ereport(ERROR,
1185  errmsg("could not fsync file \"%s\": %m", path)));
1187 
1188  CloseTransientFile(fd);
1189 }
#define write(a, b, c)
Definition: win32.h:14
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1080
#define XLogRecGetData(decoder)
Definition: xlogreader.h:226
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2396
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:325
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1260
#define ereport(elevel, rest)
Definition: elog.h:122
int CloseTransientFile(int fd)
Definition: fd.c:2566
#define XLogRecGetXid(decoder)
Definition: xlogreader.h:224
TransactionId mapped_xid
Definition: heapam_xlog.h:378
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1236
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
int errmsg(const char *fmt,...)
Definition: elog.c:797
struct LogicalRewriteMappingData LogicalRewriteMappingData
int pg_fsync(int fd)
Definition: fd.c:341
#define ftruncate(a, b)
Definition: win32_port.h:60

◆ logical_begin_heap_rewrite()

static void logical_begin_heap_rewrite ( RewriteState  state)
static

Definition at line 803 of file rewriteheap.c.

References HASHCTL::entrysize, GetXLogInsertRecPtr(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, HASHCTL::keysize, ProcArrayGetReplicationSlotXmin(), RelationIsAccessibleInLogicalDecoding, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_cxt, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_logical_xmin, RewriteStateData::rs_num_rewrite_mappings, and RewriteStateData::rs_old_rel.

Referenced by begin_heap_rewrite().

804 {
805  HASHCTL hash_ctl;
806  TransactionId logical_xmin;
807 
808  /*
809  * We only need to persist these mappings if the rewritten table can be
810  * accessed during logical decoding, if not, we can skip doing any
811  * additional work.
812  */
813  state->rs_logical_rewrite =
815 
816  if (!state->rs_logical_rewrite)
817  return;
818 
819  ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin);
820 
821  /*
822  * If there are no logical slots in progress we don't need to do anything,
823  * there cannot be any remappings for relevant rows yet. The relation's
824  * lock protects us against races.
825  */
826  if (logical_xmin == InvalidTransactionId)
827  {
828  state->rs_logical_rewrite = false;
829  return;
830  }
831 
832  state->rs_logical_xmin = logical_xmin;
834  state->rs_num_rewrite_mappings = 0;
835 
836  memset(&hash_ctl, 0, sizeof(hash_ctl));
837  hash_ctl.keysize = sizeof(TransactionId);
838  hash_ctl.entrysize = sizeof(RewriteMappingFile);
839  hash_ctl.hcxt = state->rs_cxt;
840 
841  state->rs_logical_mappings =
842  hash_create("Logical rewrite mapping",
843  128, /* arbitrary initial size */
844  &hash_ctl,
846 }
TransactionId rs_logical_xmin
Definition: rewriteheap.c:154
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
uint32 TransactionId
Definition: c.h:474
MemoryContext hcxt
Definition: hsearch.h:78
Size entrysize
Definition: hsearch.h:73
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:2986
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:164
XLogRecPtr GetXLogInsertRecPtr(void)
Definition: xlog.c:11203
#define InvalidTransactionId
Definition: transam.h:31
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:160
HTAB * rs_logical_mappings
Definition: rewriteheap.c:163
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext rs_cxt
Definition: rewriteheap.c:158
#define RelationIsAccessibleInLogicalDecoding(relation)
Definition: rel.h:564
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
Size keysize
Definition: hsearch.h:72
struct RewriteMappingFile RewriteMappingFile
Relation rs_old_rel
Definition: rewriteheap.c:143

◆ logical_end_heap_rewrite()

static void logical_end_heap_rewrite ( RewriteState  state)
static

Definition at line 950 of file rewriteheap.c.

References ereport, errcode_for_file_access(), errmsg(), ERROR, FileClose(), FileSync(), hash_seq_init(), hash_seq_search(), logical_heap_rewrite_flush_mappings(), RewriteMappingFile::path, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_num_rewrite_mappings, RewriteMappingFile::vfd, and WAIT_EVENT_LOGICAL_REWRITE_SYNC.

Referenced by end_heap_rewrite().

951 {
952  HASH_SEQ_STATUS seq_status;
953  RewriteMappingFile *src;
954 
955  /* done, no logical rewrite in progress */
956  if (!state->rs_logical_rewrite)
957  return;
958 
959  /* writeout remaining in-memory entries */
960  if (state->rs_num_rewrite_mappings > 0)
962 
963  /* Iterate over all mappings we have written and fsync the files. */
964  hash_seq_init(&seq_status, state->rs_logical_mappings);
965  while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
966  {
968  ereport(ERROR,
970  errmsg("could not fsync file \"%s\": %m", src->path)));
971  FileClose(src->vfd);
972  }
973  /* memory context cleanup will deal with the rest */
974 }
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:164
#define ERROR
Definition: elog.h:43
char path[MAXPGPATH]
Definition: rewriteheap.c:210
int FileSync(File file, uint32 wait_event_info)
Definition: fd.c:2072
int errcode_for_file_access(void)
Definition: elog.c:598
#define ereport(elevel, rest)
Definition: elog.h:122
HTAB * rs_logical_mappings
Definition: rewriteheap.c:163
void FileClose(File file)
Definition: fd.c:1742
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1389
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1379
static void logical_heap_rewrite_flush_mappings(RewriteState state)
Definition: rewriteheap.c:852
int errmsg(const char *fmt,...)
Definition: elog.c:797

◆ logical_heap_rewrite_flush_mappings()

static void logical_heap_rewrite_flush_mappings ( RewriteState  state)
static

Definition at line 852 of file rewriteheap.c.

References Assert, dlist_mutable_iter::cur, DEBUG1, dlist_container, dlist_delete(), dlist_foreach_modify, elog, ereport, errcode_for_file_access(), errmsg(), ERROR, FileWrite(), hash_seq_init(), hash_seq_search(), InvalidOid, RewriteMappingDataEntry::map, xl_heap_rewrite_mapping::mapped_db, xl_heap_rewrite_mapping::mapped_rel, xl_heap_rewrite_mapping::mapped_xid, RewriteMappingFile::mappings, MyDatabaseId, RewriteMappingDataEntry::node, RewriteMappingFile::num_mappings, xl_heap_rewrite_mapping::num_mappings, RewriteMappingFile::off, xl_heap_rewrite_mapping::offset, palloc(), RewriteMappingFile::path, pfree(), RelationData::rd_rel, RelationGetRelid, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_num_rewrite_mappings, RewriteStateData::rs_old_rel, xl_heap_rewrite_mapping::start_lsn, RewriteMappingFile::vfd, WAIT_EVENT_LOGICAL_REWRITE_WRITE, RewriteMappingFile::xid, XLOG_HEAP2_REWRITE, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by logical_end_heap_rewrite(), and logical_rewrite_log_mapping().

853 {
854  HASH_SEQ_STATUS seq_status;
855  RewriteMappingFile *src;
856  dlist_mutable_iter iter;
857 
858  Assert(state->rs_logical_rewrite);
859 
860  /* no logical rewrite in progress, no need to iterate over mappings */
861  if (state->rs_num_rewrite_mappings == 0)
862  return;
863 
864  elog(DEBUG1, "flushing %u logical rewrite mapping entries",
865  state->rs_num_rewrite_mappings);
866 
867  hash_seq_init(&seq_status, state->rs_logical_mappings);
868  while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
869  {
870  char *waldata;
871  char *waldata_start;
873  Oid dboid;
874  uint32 len;
875  int written;
876 
877  /* this file hasn't got any new mappings */
878  if (src->num_mappings == 0)
879  continue;
880 
881  if (state->rs_old_rel->rd_rel->relisshared)
882  dboid = InvalidOid;
883  else
884  dboid = MyDatabaseId;
885 
886  xlrec.num_mappings = src->num_mappings;
887  xlrec.mapped_rel = RelationGetRelid(state->rs_old_rel);
888  xlrec.mapped_xid = src->xid;
889  xlrec.mapped_db = dboid;
890  xlrec.offset = src->off;
891  xlrec.start_lsn = state->rs_begin_lsn;
892 
893  /* write all mappings consecutively */
894  len = src->num_mappings * sizeof(LogicalRewriteMappingData);
895  waldata_start = waldata = palloc(len);
896 
897  /*
898  * collect data we need to write out, but don't modify ondisk data yet
899  */
900  dlist_foreach_modify(iter, &src->mappings)
901  {
903 
904  pmap = dlist_container(RewriteMappingDataEntry, node, iter.cur);
905 
906  memcpy(waldata, &pmap->map, sizeof(pmap->map));
907  waldata += sizeof(pmap->map);
908 
909  /* remove from the list and free */
910  dlist_delete(&pmap->node);
911  pfree(pmap);
912 
913  /* update bookkeeping */
914  state->rs_num_rewrite_mappings--;
915  src->num_mappings--;
916  }
917 
918  Assert(src->num_mappings == 0);
919  Assert(waldata == waldata_start + len);
920 
921  /*
922  * Note that we deviate from the usual WAL coding practices here,
923  * check the above "Logical rewrite support" comment for reasoning.
924  */
925  written = FileWrite(src->vfd, waldata_start, len,
927  if (written != len)
928  ereport(ERROR,
930  errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path,
931  written, len)));
932  src->off += len;
933 
934  XLogBeginInsert();
935  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
936  XLogRegisterData(waldata_start, len);
937 
938  /* write xlog record */
939  XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_REWRITE);
940 
941  pfree(waldata_start);
942  }
943  Assert(state->rs_num_rewrite_mappings == 0);
944 }
#define DEBUG1
Definition: elog.h:25
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define XLOG_HEAP2_REWRITE
Definition: heapam_xlog.h:53
Form_pg_class rd_rel
Definition: rel.h:84
unsigned int Oid
Definition: postgres_ext.h:31
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:1031
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:164
#define ERROR
Definition: elog.h:43
char path[MAXPGPATH]
Definition: rewriteheap.c:210
TransactionId xid
Definition: rewriteheap.c:205
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:325
int FileWrite(File file, char *buffer, int amount, uint32 wait_event_info)
Definition: fd.c:1951
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define ereport(elevel, rest)
Definition: elog.h:122
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:160
HTAB * rs_logical_mappings
Definition: rewriteheap.c:163
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:323
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:415
Oid MyDatabaseId
Definition: globals.c:86
#define InvalidOid
Definition: postgres_ext.h:36
TransactionId mapped_xid
Definition: heapam_xlog.h:378
#define Assert(condition)
Definition: c.h:699
dlist_head mappings
Definition: rewriteheap.c:209
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1389
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1379
void * palloc(Size size)
Definition: mcxt.c:924
int errmsg(const char *fmt,...)
Definition: elog.c:797
LogicalRewriteMappingData map
Definition: rewriteheap.c:219
struct LogicalRewriteMappingData LogicalRewriteMappingData
Relation rs_old_rel
Definition: rewriteheap.c:143
#define elog
Definition: elog.h:219
void XLogBeginInsert(void)
Definition: xloginsert.c:120
#define RelationGetRelid(relation)
Definition: rel.h:407

◆ logical_rewrite_heap_tuple()

static void logical_rewrite_heap_tuple ( RewriteState  state,
ItemPointerData  old_tid,
HeapTuple  new_tuple 
)
static

Definition at line 1047 of file rewriteheap.c.

References HEAP_XMAX_IS_LOCKED_ONLY, HeapTupleHeaderGetUpdateXid, HeapTupleHeaderGetXmin, logical_rewrite_log_mapping(), LogicalRewriteMappingData::new_node, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_node, LogicalRewriteMappingData::old_tid, RelationData::rd_node, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_logical_xmin, RewriteStateData::rs_new_rel, RewriteStateData::rs_old_rel, HeapTupleData::t_data, HeapTupleHeaderData::t_infomask, HeapTupleData::t_self, TransactionIdEquals, TransactionIdIsNormal, and TransactionIdPrecedes().

Referenced by rewrite_heap_tuple().

1049 {
1050  ItemPointerData new_tid = new_tuple->t_self;
1051  TransactionId cutoff = state->rs_logical_xmin;
1052  TransactionId xmin;
1053  TransactionId xmax;
1054  bool do_log_xmin = false;
1055  bool do_log_xmax = false;
1057 
1058  /* no logical rewrite in progress, we don't need to log anything */
1059  if (!state->rs_logical_rewrite)
1060  return;
1061 
1062  xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
1063  /* use *GetUpdateXid to correctly deal with multixacts */
1064  xmax = HeapTupleHeaderGetUpdateXid(new_tuple->t_data);
1065 
1066  /*
1067  * Log the mapping iff the tuple has been created recently.
1068  */
1069  if (TransactionIdIsNormal(xmin) && !TransactionIdPrecedes(xmin, cutoff))
1070  do_log_xmin = true;
1071 
1072  if (!TransactionIdIsNormal(xmax))
1073  {
1074  /*
1075  * no xmax is set, can't have any permanent ones, so this check is
1076  * sufficient
1077  */
1078  }
1079  else if (HEAP_XMAX_IS_LOCKED_ONLY(new_tuple->t_data->t_infomask))
1080  {
1081  /* only locked, we don't care */
1082  }
1083  else if (!TransactionIdPrecedes(xmax, cutoff))
1084  {
1085  /* tuple has been deleted recently, log */
1086  do_log_xmax = true;
1087  }
1088 
1089  /* if neither needs to be logged, we're done */
1090  if (!do_log_xmin && !do_log_xmax)
1091  return;
1092 
1093  /* fill out mapping information */
1094  map.old_node = state->rs_old_rel->rd_node;
1095  map.old_tid = old_tid;
1096  map.new_node = state->rs_new_rel->rd_node;
1097  map.new_tid = new_tid;
1098 
1099  /* ---
1100  * Now persist the mapping for the individual xids that are affected. We
1101  * need to log for both xmin and xmax if they aren't the same transaction
1102  * since the mapping files are per "affected" xid.
1103  * We don't muster all that much effort detecting whether xmin and xmax
1104  * are actually the same transaction, we just check whether the xid is the
1105  * same disregarding subtransactions. Logging too much is relatively
1106  * harmless and we could never do the check fully since subtransaction
1107  * data is thrown away during restarts.
1108  * ---
1109  */
1110  if (do_log_xmin)
1111  logical_rewrite_log_mapping(state, xmin, &map);
1112  /* separately log mapping for xmax unless it'd be redundant */
1113  if (do_log_xmax && !TransactionIdEquals(xmin, xmax))
1114  logical_rewrite_log_mapping(state, xmax, &map);
1115 }
#define HeapTupleHeaderGetUpdateXid(tup)
Definition: htup_details.h:370
TransactionId rs_logical_xmin
Definition: rewriteheap.c:154
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
uint32 TransactionId
Definition: c.h:474
Relation rs_new_rel
Definition: rewriteheap.c:144
HeapTupleHeader t_data
Definition: htup.h:68
ItemPointerData t_self
Definition: htup.h:65
static void logical_rewrite_log_mapping(RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
Definition: rewriteheap.c:980
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define HEAP_XMAX_IS_LOCKED_ONLY(infomask)
Definition: htup_details.h:227
RelFileNode rd_node
Definition: rel.h:55
ItemPointerData new_tid
Definition: rewriteheap.h:40
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:318
Relation rs_old_rel
Definition: rewriteheap.c:143
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
ItemPointerData old_tid
Definition: rewriteheap.h:39

◆ logical_rewrite_log_mapping()

static void logical_rewrite_log_mapping ( RewriteState  state,
TransactionId  xid,
LogicalRewriteMappingData map 
)
static

Definition at line 980 of file rewriteheap.c.

References dlist_init(), dlist_push_tail(), ereport, errcode_for_file_access(), errmsg(), ERROR, GetCurrentTransactionId(), HASH_ENTER, hash_search(), InvalidOid, logical_heap_rewrite_flush_mappings(), LOGICAL_REWRITE_FORMAT, RewriteMappingDataEntry::map, RewriteMappingFile::mappings, MAXPGPATH, MemoryContextAlloc(), MyDatabaseId, RewriteMappingDataEntry::node, RewriteMappingFile::num_mappings, RewriteMappingFile::off, RewriteMappingFile::path, PathNameOpenFile(), PG_BINARY, RelationData::rd_rel, RelationGetRelid, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_cxt, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_num_rewrite_mappings, RewriteStateData::rs_old_rel, snprintf(), and RewriteMappingFile::vfd.

Referenced by logical_rewrite_heap_tuple().

982 {
983  RewriteMappingFile *src;
985  Oid relid;
986  bool found;
987 
988  relid = RelationGetRelid(state->rs_old_rel);
989 
990  /* look for existing mappings for this 'mapped' xid */
991  src = hash_search(state->rs_logical_mappings, &xid,
992  HASH_ENTER, &found);
993 
994  /*
995  * We haven't yet had the need to map anything for this xid, create
996  * per-xid data structures.
997  */
998  if (!found)
999  {
1000  char path[MAXPGPATH];
1001  Oid dboid;
1002 
1003  if (state->rs_old_rel->rd_rel->relisshared)
1004  dboid = InvalidOid;
1005  else
1006  dboid = MyDatabaseId;
1007 
1008  snprintf(path, MAXPGPATH,
1009  "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
1010  dboid, relid,
1011  (uint32) (state->rs_begin_lsn >> 32),
1012  (uint32) state->rs_begin_lsn,
1013  xid, GetCurrentTransactionId());
1014 
1015  dlist_init(&src->mappings);
1016  src->num_mappings = 0;
1017  src->off = 0;
1018  memcpy(src->path, path, sizeof(path));
1019  src->vfd = PathNameOpenFile(path,
1020  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1021  if (src->vfd < 0)
1022  ereport(ERROR,
1024  errmsg("could not create file \"%s\": %m", path)));
1025  }
1026 
1027  pmap = MemoryContextAlloc(state->rs_cxt,
1028  sizeof(RewriteMappingDataEntry));
1029  memcpy(&pmap->map, map, sizeof(LogicalRewriteMappingData));
1030  dlist_push_tail(&src->mappings, &pmap->node);
1031  src->num_mappings++;
1032  state->rs_num_rewrite_mappings++;
1033 
1034  /*
1035  * Write out buffer every time we've too many in-memory entries across all
1036  * mapping files.
1037  */
1038  if (state->rs_num_rewrite_mappings >= 1000 /* arbitrary number */ )
1040 }
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1351
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
Form_pg_class rd_rel
Definition: rel.h:84
unsigned int Oid
Definition: postgres_ext.h:31
#define PG_BINARY
Definition: c.h:1080
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:164
#define ERROR
Definition: elog.h:43
char path[MAXPGPATH]
Definition: rewriteheap.c:210
#define MAXPGPATH
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:417
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:325
#define ereport(elevel, rest)
Definition: elog.h:122
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:160
HTAB * rs_logical_mappings
Definition: rewriteheap.c:163
MemoryContext rs_cxt
Definition: rewriteheap.c:158
Oid MyDatabaseId
Definition: globals.c:86
#define InvalidOid
Definition: postgres_ext.h:36
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
dlist_head mappings
Definition: rewriteheap.c:209
static void logical_heap_rewrite_flush_mappings(RewriteState state)
Definition: rewriteheap.c:852
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
int errmsg(const char *fmt,...)
Definition: elog.c:797
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:771
LogicalRewriteMappingData map
Definition: rewriteheap.c:219
Relation rs_old_rel
Definition: rewriteheap.c:143
#define RelationGetRelid(relation)
Definition: rel.h:407

◆ raw_heap_insert()

static void raw_heap_insert ( RewriteState  state,
HeapTuple  tup 
)
static

Definition at line 632 of file rewriteheap.c.

References Assert, elog, ereport, errcode(), errmsg(), ERROR, HEAP_DEFAULT_FILLFACTOR, heap_freetuple(), HEAP_INSERT_SKIP_FSM, HEAP_INSERT_SKIP_WAL, HeapTupleHasExternal, InvalidOffsetNumber, ItemPointerIsValid, ItemPointerSet, log_newpage(), MAIN_FORKNUM, MAXALIGN, MaxHeapTupleSize, PageAddItem, PageGetHeapFreeSpace(), PageGetItem, PageGetItemId, PageInit(), PageSetChecksumInplace(), RelationData::rd_node, RelationData::rd_rel, RelationData::rd_smgr, RelationGetTargetPageFreeSpace, RelationOpenSmgr, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_new_rel, RewriteStateData::rs_use_wal, smgrextend(), HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, HeapTupleData::t_len, HeapTupleData::t_self, toast_insert_or_update(), and TOAST_TUPLE_THRESHOLD.

Referenced by end_heap_rewrite(), and rewrite_heap_tuple().

633 {
634  Page page = state->rs_buffer;
635  Size pageFreeSpace,
636  saveFreeSpace;
637  Size len;
638  OffsetNumber newoff;
639  HeapTuple heaptup;
640 
641  /*
642  * If the new tuple is too big for storage or contains already toasted
643  * out-of-line attributes from some other relation, invoke the toaster.
644  *
645  * Note: below this point, heaptup is the data we actually intend to store
646  * into the relation; tup is the caller's original untoasted data.
647  */
648  if (state->rs_new_rel->rd_rel->relkind == RELKIND_TOASTVALUE)
649  {
650  /* toast table entries should never be recursively toasted */
652  heaptup = tup;
653  }
654  else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
655  heaptup = toast_insert_or_update(state->rs_new_rel, tup, NULL,
657  (state->rs_use_wal ?
658  0 : HEAP_INSERT_SKIP_WAL));
659  else
660  heaptup = tup;
661 
662  len = MAXALIGN(heaptup->t_len); /* be conservative */
663 
664  /*
665  * If we're gonna fail for oversize tuple, do it right away
666  */
667  if (len > MaxHeapTupleSize)
668  ereport(ERROR,
669  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
670  errmsg("row is too big: size %zu, maximum size %zu",
671  len, MaxHeapTupleSize)));
672 
673  /* Compute desired extra freespace due to fillfactor option */
674  saveFreeSpace = RelationGetTargetPageFreeSpace(state->rs_new_rel,
676 
677  /* Now we can check to see if there's enough free space already. */
678  if (state->rs_buffer_valid)
679  {
680  pageFreeSpace = PageGetHeapFreeSpace(page);
681 
682  if (len + saveFreeSpace > pageFreeSpace)
683  {
684  /* Doesn't fit, so write out the existing page */
685 
686  /* XLOG stuff */
687  if (state->rs_use_wal)
688  log_newpage(&state->rs_new_rel->rd_node,
689  MAIN_FORKNUM,
690  state->rs_blockno,
691  page,
692  true);
693 
694  /*
695  * Now write the page. We say isTemp = true even if it's not a
696  * temp table, because there's no need for smgr to schedule an
697  * fsync for this write; we'll do it ourselves in
698  * end_heap_rewrite.
699  */
701 
702  PageSetChecksumInplace(page, state->rs_blockno);
703 
705  state->rs_blockno, (char *) page, true);
706 
707  state->rs_blockno++;
708  state->rs_buffer_valid = false;
709  }
710  }
711 
712  if (!state->rs_buffer_valid)
713  {
714  /* Initialize a new empty page */
715  PageInit(page, BLCKSZ, 0);
716  state->rs_buffer_valid = true;
717  }
718 
719  /* And now we can insert the tuple into the page */
720  newoff = PageAddItem(page, (Item) heaptup->t_data, heaptup->t_len,
721  InvalidOffsetNumber, false, true);
722  if (newoff == InvalidOffsetNumber)
723  elog(ERROR, "failed to add tuple");
724 
725  /* Update caller's t_self to the actual position where it was stored */
726  ItemPointerSet(&(tup->t_self), state->rs_blockno, newoff);
727 
728  /*
729  * Insert the correct position into CTID of the stored tuple, too, if the
730  * caller didn't supply a valid CTID.
731  */
732  if (!ItemPointerIsValid(&tup->t_data->t_ctid))
733  {
734  ItemId newitemid;
735  HeapTupleHeader onpage_tup;
736 
737  newitemid = PageGetItemId(page, newoff);
738  onpage_tup = (HeapTupleHeader) PageGetItem(page, newitemid);
739 
740  onpage_tup->t_ctid = tup->t_self;
741  }
742 
743  /* If heaptup is a private copy, release it. */
744  if (heaptup != tup)
745  heap_freetuple(heaptup);
746 }
#define ItemPointerIsValid(pointer)
Definition: itemptr.h:60
#define TOAST_TUPLE_THRESHOLD
Definition: tuptoaster.h:55
HeapTuple toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, int options)
Definition: tuptoaster.c:534
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
struct SMgrRelationData * rd_smgr
Definition: rel.h:57
Pointer Item
Definition: item.h:17
Relation rs_new_rel
Definition: rewriteheap.c:144
int errcode(int sqlerrcode)
Definition: elog.c:575
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:412
#define HEAP_INSERT_SKIP_WAL
Definition: heapam.h:28
Form_pg_class rd_rel
Definition: rel.h:84
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1773
uint16 OffsetNumber
Definition: off.h:24
HeapTupleHeader t_data
Definition: htup.h:68
#define RelationOpenSmgr(relation)
Definition: rel.h:465
#define ERROR
Definition: elog.h:43
Size PageGetHeapFreeSpace(Page page)
Definition: bufpage.c:662
ItemPointerData t_ctid
Definition: htup_details.h:157
ItemPointerData t_self
Definition: htup.h:65
uint32 t_len
Definition: htup.h:64
#define MaxHeapTupleSize
Definition: htup_details.h:578
#define ereport(elevel, rest)
Definition: elog.h:122
#define RelationGetTargetPageFreeSpace(relation, defaultff)
Definition: rel.h:298
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:231
#define InvalidOffsetNumber
Definition: off.h:26
RelFileNode rd_node
Definition: rel.h:55
#define Assert(condition)
Definition: c.h:699
size_t Size
Definition: c.h:433
void PageSetChecksumInplace(Page page, BlockNumber blkno)
Definition: bufpage.c:1195
#define MAXALIGN(LEN)
Definition: c.h:652
void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync)
Definition: smgr.c:600
#define HEAP_INSERT_SKIP_FSM
Definition: heapam.h:29
#define HeapTupleHasExternal(tuple)
Definition: htup_details.h:691
int errmsg(const char *fmt,...)
Definition: elog.c:797
XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, Page page, bool page_std)
Definition: xloginsert.c:972
#define elog
Definition: elog.h:219
#define HEAP_DEFAULT_FILLFACTOR
Definition: rel.h:269
BlockNumber rs_blockno
Definition: rewriteheap.c:146
#define PageGetItem(page, itemId)
Definition: bufpage.h:336
Pointer Page
Definition: bufpage.h:74
#define ItemPointerSet(pointer, blockNumber, offNum)
Definition: itemptr.h:105
void PageInit(Page page, Size pageSize, Size specialSize)
Definition: bufpage.c:41

◆ rewrite_heap_dead_tuple()

bool rewrite_heap_dead_tuple ( RewriteState  state,
HeapTuple  old_tuple 
)

Definition at line 582 of file rewriteheap.c.

References Assert, HASH_FIND, HASH_REMOVE, hash_search(), heap_freetuple(), HeapTupleHeaderGetXmin, RewriteStateData::rs_unresolved_tups, HeapTupleData::t_data, HeapTupleData::t_self, TidHashKey::tid, UnresolvedTupData::tuple, and TidHashKey::xmin.

Referenced by copy_heap_data().

583 {
584  /*
585  * If we have already seen an earlier tuple in the update chain that
586  * points to this tuple, let's forget about that earlier tuple. It's in
587  * fact dead as well, our simple xmax < OldestXmin test in
588  * HeapTupleSatisfiesVacuum just wasn't enough to detect it. It happens
589  * when xmin of a tuple is greater than xmax, which sounds
590  * counter-intuitive but is perfectly valid.
591  *
592  * We don't bother to try to detect the situation the other way round,
593  * when we encounter the dead tuple first and then the recently dead one
594  * that points to it. If that happens, we'll have some unmatched entries
595  * in the UnresolvedTups hash table at the end. That can happen anyway,
596  * because a vacuum might have removed the dead tuple in the chain before
597  * us.
598  */
599  UnresolvedTup unresolved;
600  TidHashKey hashkey;
601  bool found;
602 
603  memset(&hashkey, 0, sizeof(hashkey));
604  hashkey.xmin = HeapTupleHeaderGetXmin(old_tuple->t_data);
605  hashkey.tid = old_tuple->t_self;
606 
607  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
608  HASH_FIND, NULL);
609 
610  if (unresolved != NULL)
611  {
612  /* Need to free the contained tuple as well as the hashtable entry */
613  heap_freetuple(unresolved->tuple);
614  hash_search(state->rs_unresolved_tups, &hashkey,
615  HASH_REMOVE, &found);
616  Assert(found);
617  return true;
618  }
619 
620  return false;
621 }
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1773
HeapTupleHeader t_data
Definition: htup.h:68
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:161
ItemPointerData t_self
Definition: htup.h:65
ItemPointerData tid
Definition: rewriteheap.c:176
#define Assert(condition)
Definition: c.h:699
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:318
TransactionId xmin
Definition: rewriteheap.c:175

◆ rewrite_heap_tuple()

void rewrite_heap_tuple ( RewriteState  state,
HeapTuple  old_tuple,
HeapTuple  new_tuple 
)

Definition at line 380 of file rewriteheap.c.

References Assert, HASH_ENTER, HASH_FIND, HASH_REMOVE, hash_search(), HEAP2_XACT_MASK, heap_copytuple(), heap_freetuple(), heap_freeze_tuple(), HEAP_UPDATED, HEAP_XACT_MASK, HEAP_XMAX_INVALID, HeapTupleHeaderGetUpdateXid, HeapTupleHeaderGetXmin, HeapTupleHeaderIndicatesMovedPartitions, HeapTupleHeaderIsOnlyLocked(), ItemPointerEquals(), ItemPointerSetInvalid, logical_rewrite_heap_tuple(), MemoryContextSwitchTo(), OldToNewMappingData::new_tid, UnresolvedTupData::old_tid, raw_heap_insert(), RelationData::rd_rel, RewriteStateData::rs_cutoff_multi, RewriteStateData::rs_cxt, RewriteStateData::rs_freeze_xid, RewriteStateData::rs_old_new_tid_map, RewriteStateData::rs_old_rel, RewriteStateData::rs_oldest_xmin, RewriteStateData::rs_unresolved_tups, HeapTupleHeaderData::t_choice, HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, HeapTupleHeaderData::t_heap, HeapTupleHeaderData::t_infomask, HeapTupleHeaderData::t_infomask2, HeapTupleData::t_self, TidHashKey::tid, TransactionIdPrecedes(), UnresolvedTupData::tuple, and TidHashKey::xmin.

Referenced by reform_and_rewrite_tuple().

382 {
383  MemoryContext old_cxt;
384  ItemPointerData old_tid;
385  TidHashKey hashkey;
386  bool found;
387  bool free_new;
388 
389  old_cxt = MemoryContextSwitchTo(state->rs_cxt);
390 
391  /*
392  * Copy the original tuple's visibility information into new_tuple.
393  *
394  * XXX we might later need to copy some t_infomask2 bits, too? Right now,
395  * we intentionally clear the HOT status bits.
396  */
397  memcpy(&new_tuple->t_data->t_choice.t_heap,
398  &old_tuple->t_data->t_choice.t_heap,
399  sizeof(HeapTupleFields));
400 
401  new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
402  new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
403  new_tuple->t_data->t_infomask |=
404  old_tuple->t_data->t_infomask & HEAP_XACT_MASK;
405 
406  /*
407  * While we have our hands on the tuple, we may as well freeze any
408  * eligible xmin or xmax, so that future VACUUM effort can be saved.
409  */
410  heap_freeze_tuple(new_tuple->t_data,
411  state->rs_old_rel->rd_rel->relfrozenxid,
412  state->rs_old_rel->rd_rel->relminmxid,
413  state->rs_freeze_xid,
414  state->rs_cutoff_multi);
415 
416  /*
417  * Invalid ctid means that ctid should point to the tuple itself. We'll
418  * override it later if the tuple is part of an update chain.
419  */
420  ItemPointerSetInvalid(&new_tuple->t_data->t_ctid);
421 
422  /*
423  * If the tuple has been updated, check the old-to-new mapping hash table.
424  */
425  if (!((old_tuple->t_data->t_infomask & HEAP_XMAX_INVALID) ||
426  HeapTupleHeaderIsOnlyLocked(old_tuple->t_data)) &&
428  !(ItemPointerEquals(&(old_tuple->t_self),
429  &(old_tuple->t_data->t_ctid))))
430  {
431  OldToNewMapping mapping;
432 
433  memset(&hashkey, 0, sizeof(hashkey));
434  hashkey.xmin = HeapTupleHeaderGetUpdateXid(old_tuple->t_data);
435  hashkey.tid = old_tuple->t_data->t_ctid;
436 
437  mapping = (OldToNewMapping)
438  hash_search(state->rs_old_new_tid_map, &hashkey,
439  HASH_FIND, NULL);
440 
441  if (mapping != NULL)
442  {
443  /*
444  * We've already copied the tuple that t_ctid points to, so we can
445  * set the ctid of this tuple to point to the new location, and
446  * insert it right away.
447  */
448  new_tuple->t_data->t_ctid = mapping->new_tid;
449 
450  /* We don't need the mapping entry anymore */
451  hash_search(state->rs_old_new_tid_map, &hashkey,
452  HASH_REMOVE, &found);
453  Assert(found);
454  }
455  else
456  {
457  /*
458  * We haven't seen the tuple t_ctid points to yet. Stash this
459  * tuple into unresolved_tups to be written later.
460  */
461  UnresolvedTup unresolved;
462 
463  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
464  HASH_ENTER, &found);
465  Assert(!found);
466 
467  unresolved->old_tid = old_tuple->t_self;
468  unresolved->tuple = heap_copytuple(new_tuple);
469 
470  /*
471  * We can't do anything more now, since we don't know where the
472  * tuple will be written.
473  */
474  MemoryContextSwitchTo(old_cxt);
475  return;
476  }
477  }
478 
479  /*
480  * Now we will write the tuple, and then check to see if it is the B tuple
481  * in any new or known pair. When we resolve a known pair, we will be
482  * able to write that pair's A tuple, and then we have to check if it
483  * resolves some other pair. Hence, we need a loop here.
484  */
485  old_tid = old_tuple->t_self;
486  free_new = false;
487 
488  for (;;)
489  {
490  ItemPointerData new_tid;
491 
492  /* Insert the tuple and find out where it's put in new_heap */
493  raw_heap_insert(state, new_tuple);
494  new_tid = new_tuple->t_self;
495 
496  logical_rewrite_heap_tuple(state, old_tid, new_tuple);
497 
498  /*
499  * If the tuple is the updated version of a row, and the prior version
500  * wouldn't be DEAD yet, then we need to either resolve the prior
501  * version (if it's waiting in rs_unresolved_tups), or make an entry
502  * in rs_old_new_tid_map (so we can resolve it when we do see it). The
503  * previous tuple's xmax would equal this one's xmin, so it's
504  * RECENTLY_DEAD if and only if the xmin is not before OldestXmin.
505  */
506  if ((new_tuple->t_data->t_infomask & HEAP_UPDATED) &&
508  state->rs_oldest_xmin))
509  {
510  /*
511  * Okay, this is B in an update pair. See if we've seen A.
512  */
513  UnresolvedTup unresolved;
514 
515  memset(&hashkey, 0, sizeof(hashkey));
516  hashkey.xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
517  hashkey.tid = old_tid;
518 
519  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
520  HASH_FIND, NULL);
521 
522  if (unresolved != NULL)
523  {
524  /*
525  * We have seen and memorized the previous tuple already. Now
526  * that we know where we inserted the tuple its t_ctid points
527  * to, fix its t_ctid and insert it to the new heap.
528  */
529  if (free_new)
530  heap_freetuple(new_tuple);
531  new_tuple = unresolved->tuple;
532  free_new = true;
533  old_tid = unresolved->old_tid;
534  new_tuple->t_data->t_ctid = new_tid;
535 
536  /*
537  * We don't need the hash entry anymore, but don't free its
538  * tuple just yet.
539  */
540  hash_search(state->rs_unresolved_tups, &hashkey,
541  HASH_REMOVE, &found);
542  Assert(found);
543 
544  /* loop back to insert the previous tuple in the chain */
545  continue;
546  }
547  else
548  {
549  /*
550  * Remember the new tid of this tuple. We'll use it to set the
551  * ctid when we find the previous tuple in the chain.
552  */
553  OldToNewMapping mapping;
554 
555  mapping = hash_search(state->rs_old_new_tid_map, &hashkey,
556  HASH_ENTER, &found);
557  Assert(!found);
558 
559  mapping->new_tid = new_tid;
560  }
561  }
562 
563  /* Done with this (chain of) tuples, for now */
564  if (free_new)
565  heap_freetuple(new_tuple);
566  break;
567  }
568 
569  MemoryContextSwitchTo(old_cxt);
570 }
#define HeapTupleHeaderGetUpdateXid(tup)
Definition: htup_details.h:370
bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple)
Definition: tqual.c:1596
HeapTuple heap_copytuple(HeapTuple tuple)
Definition: heaptuple.c:722
union HeapTupleHeaderData::@45 t_choice
HeapTupleFields t_heap
Definition: htup_details.h:153
static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
Definition: rewriteheap.c:1047
TransactionId rs_freeze_xid
Definition: rewriteheap.c:152
MultiXactId rs_cutoff_multi
Definition: rewriteheap.c:156
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define HEAP2_XACT_MASK
Definition: htup_details.h:280
#define HeapTupleHeaderIndicatesMovedPartitions(tup)
Definition: htup_details.h:453
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
#define HEAP_UPDATED
Definition: htup_details.h:206
ItemPointerData old_tid
Definition: rewriteheap.c:185
Form_pg_class rd_rel
Definition: rel.h:84
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1773
HeapTupleHeader t_data
Definition: htup.h:68
ItemPointerData new_tid
Definition: rewriteheap.c:194
#define HEAP_XMAX_INVALID
Definition: htup_details.h:204
ItemPointerData t_ctid
Definition: htup_details.h:157
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:161
ItemPointerData t_self
Definition: htup.h:65
bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId relfrozenxid, TransactionId relminmxid, TransactionId cutoff_xid, TransactionId cutoff_multi)
Definition: heapam.c:7030
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
TransactionId rs_oldest_xmin
Definition: rewriteheap.c:150
MemoryContext rs_cxt
Definition: rewriteheap.c:158
ItemPointerData tid
Definition: rewriteheap.c:176
#define Assert(condition)
Definition: c.h:699
HTAB * rs_old_new_tid_map
Definition: rewriteheap.c:162
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:318
bool ItemPointerEquals(ItemPointer pointer1, ItemPointer pointer2)
Definition: itemptr.c:29
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:150
TransactionId xmin
Definition: rewriteheap.c:175
static void raw_heap_insert(RewriteState state, HeapTuple tup)
Definition: rewriteheap.c:632
#define HEAP_XACT_MASK
Definition: htup_details.h:215
Relation rs_old_rel
Definition: rewriteheap.c:143
OldToNewMappingData * OldToNewMapping
Definition: rewriteheap.c:197