PostgreSQL Source Code  git master
rewriteheap.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <unistd.h>
#include "access/heapam.h"
#include "access/heapam_xlog.h"
#include "access/heaptoast.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xloginsert.h"
#include "catalog/catalog.h"
#include "lib/ilist.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/slot.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/procarray.h"
#include "storage/smgr.h"
#include "utils/memutils.h"
#include "utils/rel.h"
Include dependency graph for rewriteheap.c:

Go to the source code of this file.

Data Structures

struct  RewriteStateData
 
struct  TidHashKey
 
struct  UnresolvedTupData
 
struct  OldToNewMappingData
 
struct  RewriteMappingFile
 
struct  RewriteMappingDataEntry
 

Typedefs

typedef struct RewriteStateData RewriteStateData
 
typedef UnresolvedTupDataUnresolvedTup
 
typedef OldToNewMappingDataOldToNewMapping
 
typedef struct RewriteMappingFile RewriteMappingFile
 
typedef struct RewriteMappingDataEntry RewriteMappingDataEntry
 

Functions

static void raw_heap_insert (RewriteState state, HeapTuple tup)
 
static void logical_begin_heap_rewrite (RewriteState state)
 
static void logical_rewrite_heap_tuple (RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
 
static void logical_end_heap_rewrite (RewriteState state)
 
RewriteState begin_heap_rewrite (Relation old_heap, Relation new_heap, TransactionId oldest_xmin, TransactionId freeze_xid, MultiXactId cutoff_multi, bool use_wal)
 
void end_heap_rewrite (RewriteState state)
 
void rewrite_heap_tuple (RewriteState state, HeapTuple old_tuple, HeapTuple new_tuple)
 
bool rewrite_heap_dead_tuple (RewriteState state, HeapTuple old_tuple)
 
static void logical_heap_rewrite_flush_mappings (RewriteState state)
 
static void logical_rewrite_log_mapping (RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
 
void heap_xlog_logical_rewrite (XLogReaderState *r)
 
void CheckPointLogicalRewriteHeap (void)
 

Typedef Documentation

◆ OldToNewMapping

Definition at line 188 of file rewriteheap.c.

◆ RewriteMappingDataEntry

◆ RewriteMappingFile

◆ RewriteStateData

◆ UnresolvedTup

Definition at line 180 of file rewriteheap.c.

Function Documentation

◆ begin_heap_rewrite()

RewriteState begin_heap_rewrite ( Relation  old_heap,
Relation  new_heap,
TransactionId  oldest_xmin,
TransactionId  freeze_xid,
MultiXactId  cutoff_multi,
bool  use_wal 
)

Definition at line 239 of file rewriteheap.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CurrentMemoryContext, HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, logical_begin_heap_rewrite(), MemoryContextSwitchTo(), palloc(), palloc0(), RelationGetNumberOfBlocks, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_cutoff_multi, RewriteStateData::rs_cxt, RewriteStateData::rs_freeze_xid, RewriteStateData::rs_new_rel, RewriteStateData::rs_old_new_tid_map, RewriteStateData::rs_old_rel, RewriteStateData::rs_oldest_xmin, RewriteStateData::rs_unresolved_tups, and RewriteStateData::rs_use_wal.

Referenced by heapam_relation_copy_for_cluster().

242 {
244  MemoryContext rw_cxt;
245  MemoryContext old_cxt;
246  HASHCTL hash_ctl;
247 
248  /*
249  * To ease cleanup, make a separate context that will contain the
250  * RewriteState struct itself plus all subsidiary data.
251  */
253  "Table rewrite",
255  old_cxt = MemoryContextSwitchTo(rw_cxt);
256 
257  /* Create and fill in the state struct */
258  state = palloc0(sizeof(RewriteStateData));
259 
260  state->rs_old_rel = old_heap;
261  state->rs_new_rel = new_heap;
262  state->rs_buffer = (Page) palloc(BLCKSZ);
263  /* new_heap needn't be empty, just locked */
264  state->rs_blockno = RelationGetNumberOfBlocks(new_heap);
265  state->rs_buffer_valid = false;
266  state->rs_use_wal = use_wal;
267  state->rs_oldest_xmin = oldest_xmin;
268  state->rs_freeze_xid = freeze_xid;
269  state->rs_cutoff_multi = cutoff_multi;
270  state->rs_cxt = rw_cxt;
271 
272  /* Initialize hash tables used to track update chains */
273  memset(&hash_ctl, 0, sizeof(hash_ctl));
274  hash_ctl.keysize = sizeof(TidHashKey);
275  hash_ctl.entrysize = sizeof(UnresolvedTupData);
276  hash_ctl.hcxt = state->rs_cxt;
277 
278  state->rs_unresolved_tups =
279  hash_create("Rewrite / Unresolved ctids",
280  128, /* arbitrary initial size */
281  &hash_ctl,
283 
284  hash_ctl.entrysize = sizeof(OldToNewMappingData);
285 
286  state->rs_old_new_tid_map =
287  hash_create("Rewrite / Old to new tid map",
288  128, /* arbitrary initial size */
289  &hash_ctl,
291 
292  MemoryContextSwitchTo(old_cxt);
293 
295 
296  return state;
297 }
#define AllocSetContextCreate
Definition: memutils.h:170
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
TransactionId rs_freeze_xid
Definition: rewriteheap.c:143
MultiXactId rs_cutoff_multi
Definition: rewriteheap.c:147
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Size entrysize
Definition: hsearch.h:73
Relation rs_new_rel
Definition: rewriteheap.c:135
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:152
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
TransactionId rs_oldest_xmin
Definition: rewriteheap.c:141
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext rs_cxt
Definition: rewriteheap.c:149
void * palloc0(Size size)
Definition: mcxt.c:980
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
Size keysize
Definition: hsearch.h:72
#define RelationGetNumberOfBlocks(reln)
Definition: bufmgr.h:198
static void logical_begin_heap_rewrite(RewriteState state)
Definition: rewriteheap.c:805
HTAB * rs_old_new_tid_map
Definition: rewriteheap.c:153
Definition: regguts.h:298
void * palloc(Size size)
Definition: mcxt.c:949
Relation rs_old_rel
Definition: rewriteheap.c:134
BlockNumber rs_blockno
Definition: rewriteheap.c:137
Pointer Page
Definition: bufpage.h:78

◆ CheckPointLogicalRewriteHeap()

void CheckPointLogicalRewriteHeap ( void  )

Definition at line 1213 of file rewriteheap.c.

References AllocateDir(), CloseTransientFile(), dirent::d_name, data_sync_elevel(), DEBUG1, elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FreeDir(), GetRedoRecPtr(), InvalidXLogRecPtr, LOGICAL_REWRITE_FORMAT, lstat, MAXPGPATH, OpenTransientFile(), PG_BINARY, pg_fsync(), pgstat_report_wait_end(), pgstat_report_wait_start(), ReadDir(), ReplicationSlotsComputeLogicalRestartLSN(), S_ISREG, snprintf, stat, and WAIT_EVENT_LOGICAL_REWRITE_CHECKPOINT_SYNC.

Referenced by CheckPointGuts().

1214 {
1215  XLogRecPtr cutoff;
1216  XLogRecPtr redo;
1217  DIR *mappings_dir;
1218  struct dirent *mapping_de;
1219  char path[MAXPGPATH + 20];
1220 
1221  /*
1222  * We start of with a minimum of the last redo pointer. No new decoding
1223  * slot will start before that, so that's a safe upper bound for removal.
1224  */
1225  redo = GetRedoRecPtr();
1226 
1227  /* now check for the restart ptrs from existing slots */
1229 
1230  /* don't start earlier than the restart lsn */
1231  if (cutoff != InvalidXLogRecPtr && redo < cutoff)
1232  cutoff = redo;
1233 
1234  mappings_dir = AllocateDir("pg_logical/mappings");
1235  while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
1236  {
1237  struct stat statbuf;
1238  Oid dboid;
1239  Oid relid;
1240  XLogRecPtr lsn;
1241  TransactionId rewrite_xid;
1242  TransactionId create_xid;
1243  uint32 hi,
1244  lo;
1245 
1246  if (strcmp(mapping_de->d_name, ".") == 0 ||
1247  strcmp(mapping_de->d_name, "..") == 0)
1248  continue;
1249 
1250  snprintf(path, sizeof(path), "pg_logical/mappings/%s", mapping_de->d_name);
1251  if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
1252  continue;
1253 
1254  /* Skip over files that cannot be ours. */
1255  if (strncmp(mapping_de->d_name, "map-", 4) != 0)
1256  continue;
1257 
1258  if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
1259  &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
1260  elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
1261 
1262  lsn = ((uint64) hi) << 32 | lo;
1263 
1264  if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
1265  {
1266  elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
1267  if (unlink(path) < 0)
1268  ereport(ERROR,
1270  errmsg("could not remove file \"%s\": %m", path)));
1271  }
1272  else
1273  {
1274  /* on some operating systems fsyncing a file requires O_RDWR */
1275  int fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
1276 
1277  /*
1278  * The file cannot vanish due to concurrency since this function
1279  * is the only one removing logical mappings and it's run while
1280  * CheckpointLock is held exclusively.
1281  */
1282  if (fd < 0)
1283  ereport(ERROR,
1285  errmsg("could not open file \"%s\": %m", path)));
1286 
1287  /*
1288  * We could try to avoid fsyncing files that either haven't
1289  * changed or have only been created since the checkpoint's start,
1290  * but it's currently not deemed worth the effort.
1291  */
1293  if (pg_fsync(fd) != 0)
1296  errmsg("could not fsync file \"%s\": %m", path)));
1298 
1299  if (CloseTransientFile(fd) != 0)
1300  ereport(ERROR,
1302  errmsg("could not close file \"%s\": %m", path)));
1303  }
1304  }
1305  FreeDir(mappings_dir);
1306 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define DEBUG1
Definition: elog.h:25
uint32 TransactionId
Definition: c.h:514
unsigned int Oid
Definition: postgres_ext.h:31
Definition: dirent.h:9
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1222
Definition: dirent.c:25
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2292
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:631
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition: slot.c:791
unsigned int uint32
Definition: c.h:359
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2503
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1342
#define ereport(elevel, rest)
Definition: elog.h:141
#define S_ISREG(m)
Definition: win32_port.h:299
int CloseTransientFile(int fd)
Definition: fd.c:2469
#define stat(a, b)
Definition: win32_port.h:255
int data_sync_elevel(int elevel)
Definition: fd.c:3519
uint64 XLogRecPtr
Definition: xlogdefs.h:21
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2569
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1318
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:8208
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
#define lstat(path, sb)
Definition: win32_port.h:244
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define elog(elevel,...)
Definition: elog.h:228
int pg_fsync(int fd)
Definition: fd.c:330
char d_name[MAX_PATH]
Definition: dirent.h:14
#define snprintf
Definition: port.h:192
int FreeDir(DIR *dir)
Definition: fd.c:2621

◆ end_heap_rewrite()

void end_heap_rewrite ( RewriteState  state)

Definition at line 305 of file rewriteheap.c.

References hash_seq_init(), hash_seq_search(), heap_sync(), ItemPointerSetInvalid, log_newpage(), logical_end_heap_rewrite(), MAIN_FORKNUM, MemoryContextDelete(), PageSetChecksumInplace(), raw_heap_insert(), RelationData::rd_node, RelationData::rd_smgr, RelationNeedsWAL, RelationOpenSmgr, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_cxt, RewriteStateData::rs_new_rel, RewriteStateData::rs_unresolved_tups, RewriteStateData::rs_use_wal, smgrextend(), HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, and UnresolvedTupData::tuple.

Referenced by heapam_relation_copy_for_cluster().

306 {
307  HASH_SEQ_STATUS seq_status;
308  UnresolvedTup unresolved;
309 
310  /*
311  * Write any remaining tuples in the UnresolvedTups table. If we have any
312  * left, they should in fact be dead, but let's err on the safe side.
313  */
314  hash_seq_init(&seq_status, state->rs_unresolved_tups);
315 
316  while ((unresolved = hash_seq_search(&seq_status)) != NULL)
317  {
318  ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
319  raw_heap_insert(state, unresolved->tuple);
320  }
321 
322  /* Write the last page, if any */
323  if (state->rs_buffer_valid)
324  {
325  if (state->rs_use_wal)
326  log_newpage(&state->rs_new_rel->rd_node,
327  MAIN_FORKNUM,
328  state->rs_blockno,
329  state->rs_buffer,
330  true);
332 
334 
336  (char *) state->rs_buffer, true);
337  }
338 
339  /*
340  * If the rel is WAL-logged, must fsync before commit. We use heap_sync
341  * to ensure that the toast table gets fsync'd too.
342  *
343  * It's obvious that we must do this when not WAL-logging. It's less
344  * obvious that we have to do it even if we did WAL-log the pages. The
345  * reason is the same as in storage.c's RelationCopyStorage(): we're
346  * writing data that's not in shared buffers, and so a CHECKPOINT
347  * occurring during the rewriteheap operation won't have fsync'd data we
348  * wrote before the checkpoint.
349  */
350  if (RelationNeedsWAL(state->rs_new_rel))
351  heap_sync(state->rs_new_rel);
352 
354 
355  /* Deleting the context frees everything */
356  MemoryContextDelete(state->rs_cxt);
357 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
static void logical_end_heap_rewrite(RewriteState state)
Definition: rewriteheap.c:952
struct SMgrRelationData * rd_smgr
Definition: rel.h:56
Relation rs_new_rel
Definition: rewriteheap.c:135
void heap_sync(Relation rel)
Definition: heapam.c:8938
HeapTupleHeader t_data
Definition: htup.h:68
#define RelationOpenSmgr(relation)
Definition: rel.h:479
ItemPointerData t_ctid
Definition: htup_details.h:160
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:152
MemoryContext rs_cxt
Definition: rewriteheap.c:149
RelFileNode rd_node
Definition: rel.h:54
void PageSetChecksumInplace(Page page, BlockNumber blkno)
Definition: bufpage.c:1198
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1389
#define RelationNeedsWAL(relation)
Definition: rel.h:524
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1379
void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync)
Definition: smgr.c:483
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:172
XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, Page page, bool page_std)
Definition: xloginsert.c:972
static void raw_heap_insert(RewriteState state, HeapTuple tup)
Definition: rewriteheap.c:623
BlockNumber rs_blockno
Definition: rewriteheap.c:137

◆ heap_xlog_logical_rewrite()

void heap_xlog_logical_rewrite ( XLogReaderState r)

Definition at line 1123 of file rewriteheap.c.

References CloseTransientFile(), data_sync_elevel(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ftruncate, LOGICAL_REWRITE_FORMAT, xl_heap_rewrite_mapping::mapped_db, xl_heap_rewrite_mapping::mapped_rel, xl_heap_rewrite_mapping::mapped_xid, MAXPGPATH, xl_heap_rewrite_mapping::num_mappings, xl_heap_rewrite_mapping::offset, OpenTransientFile(), PG_BINARY, pg_fsync(), pgstat_report_wait_end(), pgstat_report_wait_start(), snprintf, xl_heap_rewrite_mapping::start_lsn, WAIT_EVENT_LOGICAL_REWRITE_MAPPING_SYNC, WAIT_EVENT_LOGICAL_REWRITE_MAPPING_WRITE, WAIT_EVENT_LOGICAL_REWRITE_TRUNCATE, write, XLogRecGetData, and XLogRecGetXid.

Referenced by heap2_redo().

1124 {
1125  char path[MAXPGPATH];
1126  int fd;
1127  xl_heap_rewrite_mapping *xlrec;
1128  uint32 len;
1129  char *data;
1130 
1131  xlrec = (xl_heap_rewrite_mapping *) XLogRecGetData(r);
1132 
1133  snprintf(path, MAXPGPATH,
1134  "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
1135  xlrec->mapped_db, xlrec->mapped_rel,
1136  (uint32) (xlrec->start_lsn >> 32),
1137  (uint32) xlrec->start_lsn,
1138  xlrec->mapped_xid, XLogRecGetXid(r));
1139 
1140  fd = OpenTransientFile(path,
1141  O_CREAT | O_WRONLY | PG_BINARY);
1142  if (fd < 0)
1143  ereport(ERROR,
1145  errmsg("could not create file \"%s\": %m", path)));
1146 
1147  /*
1148  * Truncate all data that's not guaranteed to have been safely fsynced (by
1149  * previous record or by the last checkpoint).
1150  */
1152  if (ftruncate(fd, xlrec->offset) != 0)
1153  ereport(ERROR,
1155  errmsg("could not truncate file \"%s\" to %u: %m",
1156  path, (uint32) xlrec->offset)));
1158 
1159  /* now seek to the position we want to write our data to */
1160  if (lseek(fd, xlrec->offset, SEEK_SET) != xlrec->offset)
1161  ereport(ERROR,
1163  errmsg("could not seek to end of file \"%s\": %m",
1164  path)));
1165 
1166  data = XLogRecGetData(r) + sizeof(*xlrec);
1167 
1168  len = xlrec->num_mappings * sizeof(LogicalRewriteMappingData);
1169 
1170  /* write out tail end of mapping file (again) */
1171  errno = 0;
1173  if (write(fd, data, len) != len)
1174  {
1175  /* if write didn't set errno, assume problem is no disk space */
1176  if (errno == 0)
1177  errno = ENOSPC;
1178  ereport(ERROR,
1180  errmsg("could not write to file \"%s\": %m", path)));
1181  }
1183 
1184  /*
1185  * Now fsync all previously written data. We could improve things and only
1186  * do this for the last write to a file, but the required bookkeeping
1187  * doesn't seem worth the trouble.
1188  */
1190  if (pg_fsync(fd) != 0)
1193  errmsg("could not fsync file \"%s\": %m", path)));
1195 
1196  if (CloseTransientFile(fd) != 0)
1197  ereport(ERROR,
1199  errmsg("could not close file \"%s\": %m", path)));
1200 }
#define write(a, b, c)
Definition: win32.h:14
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1222
#define XLogRecGetData(decoder)
Definition: xlogreader.h:283
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2292
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:631
unsigned int uint32
Definition: c.h:359
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1342
#define ereport(elevel, rest)
Definition: elog.h:141
int CloseTransientFile(int fd)
Definition: fd.c:2469
#define XLogRecGetXid(decoder)
Definition: xlogreader.h:281
int data_sync_elevel(int elevel)
Definition: fd.c:3519
TransactionId mapped_xid
Definition: heapam_xlog.h:378
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1318
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
int errmsg(const char *fmt,...)
Definition: elog.c:822
struct LogicalRewriteMappingData LogicalRewriteMappingData
int pg_fsync(int fd)
Definition: fd.c:330
#define snprintf
Definition: port.h:192
#define ftruncate(a, b)
Definition: win32_port.h:60

◆ logical_begin_heap_rewrite()

static void logical_begin_heap_rewrite ( RewriteState  state)
static

Definition at line 805 of file rewriteheap.c.

References HASHCTL::entrysize, GetXLogInsertRecPtr(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, HASHCTL::keysize, ProcArrayGetReplicationSlotXmin(), RelationIsAccessibleInLogicalDecoding, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_cxt, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_logical_xmin, RewriteStateData::rs_num_rewrite_mappings, and RewriteStateData::rs_old_rel.

Referenced by begin_heap_rewrite().

806 {
807  HASHCTL hash_ctl;
808  TransactionId logical_xmin;
809 
810  /*
811  * We only need to persist these mappings if the rewritten table can be
812  * accessed during logical decoding, if not, we can skip doing any
813  * additional work.
814  */
815  state->rs_logical_rewrite =
817 
818  if (!state->rs_logical_rewrite)
819  return;
820 
821  ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin);
822 
823  /*
824  * If there are no logical slots in progress we don't need to do anything,
825  * there cannot be any remappings for relevant rows yet. The relation's
826  * lock protects us against races.
827  */
828  if (logical_xmin == InvalidTransactionId)
829  {
830  state->rs_logical_rewrite = false;
831  return;
832  }
833 
834  state->rs_logical_xmin = logical_xmin;
836  state->rs_num_rewrite_mappings = 0;
837 
838  memset(&hash_ctl, 0, sizeof(hash_ctl));
839  hash_ctl.keysize = sizeof(TransactionId);
840  hash_ctl.entrysize = sizeof(RewriteMappingFile);
841  hash_ctl.hcxt = state->rs_cxt;
842 
843  state->rs_logical_mappings =
844  hash_create("Logical rewrite mapping",
845  128, /* arbitrary initial size */
846  &hash_ctl,
848 }
TransactionId rs_logical_xmin
Definition: rewriteheap.c:145
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
uint32 TransactionId
Definition: c.h:514
MemoryContext hcxt
Definition: hsearch.h:78
Size entrysize
Definition: hsearch.h:73
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:3117
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:155
XLogRecPtr GetXLogInsertRecPtr(void)
Definition: xlog.c:11191
#define InvalidTransactionId
Definition: transam.h:31
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:151
HTAB * rs_logical_mappings
Definition: rewriteheap.c:154
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext rs_cxt
Definition: rewriteheap.c:149
#define RelationIsAccessibleInLogicalDecoding(relation)
Definition: rel.h:578
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
Size keysize
Definition: hsearch.h:72
struct RewriteMappingFile RewriteMappingFile
Relation rs_old_rel
Definition: rewriteheap.c:134

◆ logical_end_heap_rewrite()

static void logical_end_heap_rewrite ( RewriteState  state)
static

Definition at line 952 of file rewriteheap.c.

References data_sync_elevel(), ereport, errcode_for_file_access(), errmsg(), ERROR, FileClose(), FileSync(), hash_seq_init(), hash_seq_search(), logical_heap_rewrite_flush_mappings(), RewriteMappingFile::path, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_num_rewrite_mappings, RewriteMappingFile::vfd, and WAIT_EVENT_LOGICAL_REWRITE_SYNC.

Referenced by end_heap_rewrite().

953 {
954  HASH_SEQ_STATUS seq_status;
955  RewriteMappingFile *src;
956 
957  /* done, no logical rewrite in progress */
958  if (!state->rs_logical_rewrite)
959  return;
960 
961  /* writeout remaining in-memory entries */
962  if (state->rs_num_rewrite_mappings > 0)
964 
965  /* Iterate over all mappings we have written and fsync the files. */
966  hash_seq_init(&seq_status, state->rs_logical_mappings);
967  while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
968  {
972  errmsg("could not fsync file \"%s\": %m", src->path)));
973  FileClose(src->vfd);
974  }
975  /* memory context cleanup will deal with the rest */
976 }
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:155
#define ERROR
Definition: elog.h:43
char path[MAXPGPATH]
Definition: rewriteheap.c:201
int FileSync(File file, uint32 wait_event_info)
Definition: fd.c:2049
int errcode_for_file_access(void)
Definition: elog.c:631
#define ereport(elevel, rest)
Definition: elog.h:141
HTAB * rs_logical_mappings
Definition: rewriteheap.c:154
int data_sync_elevel(int elevel)
Definition: fd.c:3519
void FileClose(File file)
Definition: fd.c:1748
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1389
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1379
static void logical_heap_rewrite_flush_mappings(RewriteState state)
Definition: rewriteheap.c:854
int errmsg(const char *fmt,...)
Definition: elog.c:822

◆ logical_heap_rewrite_flush_mappings()

static void logical_heap_rewrite_flush_mappings ( RewriteState  state)
static

Definition at line 854 of file rewriteheap.c.

References Assert, dlist_mutable_iter::cur, DEBUG1, dlist_container, dlist_delete(), dlist_foreach_modify, elog, ereport, errcode_for_file_access(), errmsg(), ERROR, FileWrite(), hash_seq_init(), hash_seq_search(), InvalidOid, RewriteMappingDataEntry::map, xl_heap_rewrite_mapping::mapped_db, xl_heap_rewrite_mapping::mapped_rel, xl_heap_rewrite_mapping::mapped_xid, RewriteMappingFile::mappings, MyDatabaseId, RewriteMappingDataEntry::node, RewriteMappingFile::num_mappings, xl_heap_rewrite_mapping::num_mappings, RewriteMappingFile::off, xl_heap_rewrite_mapping::offset, palloc(), RewriteMappingFile::path, pfree(), RelationData::rd_rel, RelationGetRelid, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_num_rewrite_mappings, RewriteStateData::rs_old_rel, xl_heap_rewrite_mapping::start_lsn, RewriteMappingFile::vfd, WAIT_EVENT_LOGICAL_REWRITE_WRITE, RewriteMappingFile::xid, XLOG_HEAP2_REWRITE, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by logical_end_heap_rewrite(), and logical_rewrite_log_mapping().

855 {
856  HASH_SEQ_STATUS seq_status;
857  RewriteMappingFile *src;
858  dlist_mutable_iter iter;
859 
860  Assert(state->rs_logical_rewrite);
861 
862  /* no logical rewrite in progress, no need to iterate over mappings */
863  if (state->rs_num_rewrite_mappings == 0)
864  return;
865 
866  elog(DEBUG1, "flushing %u logical rewrite mapping entries",
867  state->rs_num_rewrite_mappings);
868 
869  hash_seq_init(&seq_status, state->rs_logical_mappings);
870  while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
871  {
872  char *waldata;
873  char *waldata_start;
875  Oid dboid;
876  uint32 len;
877  int written;
878 
879  /* this file hasn't got any new mappings */
880  if (src->num_mappings == 0)
881  continue;
882 
883  if (state->rs_old_rel->rd_rel->relisshared)
884  dboid = InvalidOid;
885  else
886  dboid = MyDatabaseId;
887 
888  xlrec.num_mappings = src->num_mappings;
889  xlrec.mapped_rel = RelationGetRelid(state->rs_old_rel);
890  xlrec.mapped_xid = src->xid;
891  xlrec.mapped_db = dboid;
892  xlrec.offset = src->off;
893  xlrec.start_lsn = state->rs_begin_lsn;
894 
895  /* write all mappings consecutively */
896  len = src->num_mappings * sizeof(LogicalRewriteMappingData);
897  waldata_start = waldata = palloc(len);
898 
899  /*
900  * collect data we need to write out, but don't modify ondisk data yet
901  */
902  dlist_foreach_modify(iter, &src->mappings)
903  {
905 
906  pmap = dlist_container(RewriteMappingDataEntry, node, iter.cur);
907 
908  memcpy(waldata, &pmap->map, sizeof(pmap->map));
909  waldata += sizeof(pmap->map);
910 
911  /* remove from the list and free */
912  dlist_delete(&pmap->node);
913  pfree(pmap);
914 
915  /* update bookkeeping */
916  state->rs_num_rewrite_mappings--;
917  src->num_mappings--;
918  }
919 
920  Assert(src->num_mappings == 0);
921  Assert(waldata == waldata_start + len);
922 
923  /*
924  * Note that we deviate from the usual WAL coding practices here,
925  * check the above "Logical rewrite support" comment for reasoning.
926  */
927  written = FileWrite(src->vfd, waldata_start, len, src->off,
929  if (written != len)
930  ereport(ERROR,
932  errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path,
933  written, len)));
934  src->off += len;
935 
936  XLogBeginInsert();
937  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
938  XLogRegisterData(waldata_start, len);
939 
940  /* write xlog record */
941  XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_REWRITE);
942 
943  pfree(waldata_start);
944  }
945  Assert(state->rs_num_rewrite_mappings == 0);
946 }
#define DEBUG1
Definition: elog.h:25
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define XLOG_HEAP2_REWRITE
Definition: heapam_xlog.h:53
Form_pg_class rd_rel
Definition: rel.h:83
unsigned int Oid
Definition: postgres_ext.h:31
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:1056
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:155
#define ERROR
Definition: elog.h:43
char path[MAXPGPATH]
Definition: rewriteheap.c:201
TransactionId xid
Definition: rewriteheap.c:196
int errcode_for_file_access(void)
Definition: elog.c:631
unsigned int uint32
Definition: c.h:359
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define ereport(elevel, rest)
Definition: elog.h:141
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:151
HTAB * rs_logical_mappings
Definition: rewriteheap.c:154
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:323
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:415
int FileWrite(File file, char *buffer, int amount, off_t offset, uint32 wait_event_info)
Definition: fd.c:1951
Oid MyDatabaseId
Definition: globals.c:85
#define InvalidOid
Definition: postgres_ext.h:36
TransactionId mapped_xid
Definition: heapam_xlog.h:378
#define Assert(condition)
Definition: c.h:739
dlist_head mappings
Definition: rewriteheap.c:200
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1389
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1379
void * palloc(Size size)
Definition: mcxt.c:949
int errmsg(const char *fmt,...)
Definition: elog.c:822
LogicalRewriteMappingData map
Definition: rewriteheap.c:210
#define elog(elevel,...)
Definition: elog.h:228
struct LogicalRewriteMappingData LogicalRewriteMappingData
Relation rs_old_rel
Definition: rewriteheap.c:134
void XLogBeginInsert(void)
Definition: xloginsert.c:120
#define RelationGetRelid(relation)
Definition: rel.h:422

◆ logical_rewrite_heap_tuple()

static void logical_rewrite_heap_tuple ( RewriteState  state,
ItemPointerData  old_tid,
HeapTuple  new_tuple 
)
static

Definition at line 1049 of file rewriteheap.c.

References HEAP_XMAX_IS_LOCKED_ONLY, HeapTupleHeaderGetUpdateXid, HeapTupleHeaderGetXmin, logical_rewrite_log_mapping(), LogicalRewriteMappingData::new_node, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_node, LogicalRewriteMappingData::old_tid, RelationData::rd_node, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_logical_xmin, RewriteStateData::rs_new_rel, RewriteStateData::rs_old_rel, HeapTupleData::t_data, HeapTupleHeaderData::t_infomask, HeapTupleData::t_self, TransactionIdEquals, TransactionIdIsNormal, and TransactionIdPrecedes().

Referenced by rewrite_heap_tuple().

1051 {
1052  ItemPointerData new_tid = new_tuple->t_self;
1053  TransactionId cutoff = state->rs_logical_xmin;
1054  TransactionId xmin;
1055  TransactionId xmax;
1056  bool do_log_xmin = false;
1057  bool do_log_xmax = false;
1059 
1060  /* no logical rewrite in progress, we don't need to log anything */
1061  if (!state->rs_logical_rewrite)
1062  return;
1063 
1064  xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
1065  /* use *GetUpdateXid to correctly deal with multixacts */
1066  xmax = HeapTupleHeaderGetUpdateXid(new_tuple->t_data);
1067 
1068  /*
1069  * Log the mapping iff the tuple has been created recently.
1070  */
1071  if (TransactionIdIsNormal(xmin) && !TransactionIdPrecedes(xmin, cutoff))
1072  do_log_xmin = true;
1073 
1074  if (!TransactionIdIsNormal(xmax))
1075  {
1076  /*
1077  * no xmax is set, can't have any permanent ones, so this check is
1078  * sufficient
1079  */
1080  }
1081  else if (HEAP_XMAX_IS_LOCKED_ONLY(new_tuple->t_data->t_infomask))
1082  {
1083  /* only locked, we don't care */
1084  }
1085  else if (!TransactionIdPrecedes(xmax, cutoff))
1086  {
1087  /* tuple has been deleted recently, log */
1088  do_log_xmax = true;
1089  }
1090 
1091  /* if neither needs to be logged, we're done */
1092  if (!do_log_xmin && !do_log_xmax)
1093  return;
1094 
1095  /* fill out mapping information */
1096  map.old_node = state->rs_old_rel->rd_node;
1097  map.old_tid = old_tid;
1098  map.new_node = state->rs_new_rel->rd_node;
1099  map.new_tid = new_tid;
1100 
1101  /* ---
1102  * Now persist the mapping for the individual xids that are affected. We
1103  * need to log for both xmin and xmax if they aren't the same transaction
1104  * since the mapping files are per "affected" xid.
1105  * We don't muster all that much effort detecting whether xmin and xmax
1106  * are actually the same transaction, we just check whether the xid is the
1107  * same disregarding subtransactions. Logging too much is relatively
1108  * harmless and we could never do the check fully since subtransaction
1109  * data is thrown away during restarts.
1110  * ---
1111  */
1112  if (do_log_xmin)
1113  logical_rewrite_log_mapping(state, xmin, &map);
1114  /* separately log mapping for xmax unless it'd be redundant */
1115  if (do_log_xmax && !TransactionIdEquals(xmin, xmax))
1116  logical_rewrite_log_mapping(state, xmax, &map);
1117 }
#define HeapTupleHeaderGetUpdateXid(tup)
Definition: htup_details.h:365
TransactionId rs_logical_xmin
Definition: rewriteheap.c:145
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
uint32 TransactionId
Definition: c.h:514
Relation rs_new_rel
Definition: rewriteheap.c:135
HeapTupleHeader t_data
Definition: htup.h:68
ItemPointerData t_self
Definition: htup.h:65
static void logical_rewrite_log_mapping(RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
Definition: rewriteheap.c:982
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define HEAP_XMAX_IS_LOCKED_ONLY(infomask)
Definition: htup_details.h:230
RelFileNode rd_node
Definition: rel.h:54
ItemPointerData new_tid
Definition: rewriteheap.h:40
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:313
Relation rs_old_rel
Definition: rewriteheap.c:134
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
ItemPointerData old_tid
Definition: rewriteheap.h:39

◆ logical_rewrite_log_mapping()

static void logical_rewrite_log_mapping ( RewriteState  state,
TransactionId  xid,
LogicalRewriteMappingData map 
)
static

Definition at line 982 of file rewriteheap.c.

References dlist_init(), dlist_push_tail(), ereport, errcode_for_file_access(), errmsg(), ERROR, GetCurrentTransactionId(), HASH_ENTER, hash_search(), InvalidOid, logical_heap_rewrite_flush_mappings(), LOGICAL_REWRITE_FORMAT, RewriteMappingDataEntry::map, RewriteMappingFile::mappings, MAXPGPATH, MemoryContextAlloc(), MyDatabaseId, RewriteMappingDataEntry::node, RewriteMappingFile::num_mappings, RewriteMappingFile::off, RewriteMappingFile::path, PathNameOpenFile(), PG_BINARY, RelationData::rd_rel, RelationGetRelid, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_cxt, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_num_rewrite_mappings, RewriteStateData::rs_old_rel, snprintf, and RewriteMappingFile::vfd.

Referenced by logical_rewrite_heap_tuple().

984 {
985  RewriteMappingFile *src;
987  Oid relid;
988  bool found;
989 
990  relid = RelationGetRelid(state->rs_old_rel);
991 
992  /* look for existing mappings for this 'mapped' xid */
993  src = hash_search(state->rs_logical_mappings, &xid,
994  HASH_ENTER, &found);
995 
996  /*
997  * We haven't yet had the need to map anything for this xid, create
998  * per-xid data structures.
999  */
1000  if (!found)
1001  {
1002  char path[MAXPGPATH];
1003  Oid dboid;
1004 
1005  if (state->rs_old_rel->rd_rel->relisshared)
1006  dboid = InvalidOid;
1007  else
1008  dboid = MyDatabaseId;
1009 
1010  snprintf(path, MAXPGPATH,
1011  "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
1012  dboid, relid,
1013  (uint32) (state->rs_begin_lsn >> 32),
1014  (uint32) state->rs_begin_lsn,
1015  xid, GetCurrentTransactionId());
1016 
1017  dlist_init(&src->mappings);
1018  src->num_mappings = 0;
1019  src->off = 0;
1020  memcpy(src->path, path, sizeof(path));
1021  src->vfd = PathNameOpenFile(path,
1022  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1023  if (src->vfd < 0)
1024  ereport(ERROR,
1026  errmsg("could not create file \"%s\": %m", path)));
1027  }
1028 
1029  pmap = MemoryContextAlloc(state->rs_cxt,
1030  sizeof(RewriteMappingDataEntry));
1031  memcpy(&pmap->map, map, sizeof(LogicalRewriteMappingData));
1032  dlist_push_tail(&src->mappings, &pmap->node);
1033  src->num_mappings++;
1034  state->rs_num_rewrite_mappings++;
1035 
1036  /*
1037  * Write out buffer every time we've too many in-memory entries across all
1038  * mapping files.
1039  */
1040  if (state->rs_num_rewrite_mappings >= 1000 /* arbitrary number */ )
1042 }
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1358
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
Form_pg_class rd_rel
Definition: rel.h:83
unsigned int Oid
Definition: postgres_ext.h:31
#define PG_BINARY
Definition: c.h:1222
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:155
#define ERROR
Definition: elog.h:43
char path[MAXPGPATH]
Definition: rewriteheap.c:201
#define MAXPGPATH
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:422
int errcode_for_file_access(void)
Definition: elog.c:631
unsigned int uint32
Definition: c.h:359
#define ereport(elevel, rest)
Definition: elog.h:141
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:151
HTAB * rs_logical_mappings
Definition: rewriteheap.c:154
MemoryContext rs_cxt
Definition: rewriteheap.c:149
Oid MyDatabaseId
Definition: globals.c:85
#define InvalidOid
Definition: postgres_ext.h:36
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
dlist_head mappings
Definition: rewriteheap.c:200
static void logical_heap_rewrite_flush_mappings(RewriteState state)
Definition: rewriteheap.c:854
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
int errmsg(const char *fmt,...)
Definition: elog.c:822
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
LogicalRewriteMappingData map
Definition: rewriteheap.c:210
Relation rs_old_rel
Definition: rewriteheap.c:134
#define snprintf
Definition: port.h:192
#define RelationGetRelid(relation)
Definition: rel.h:422

◆ raw_heap_insert()

static void raw_heap_insert ( RewriteState  state,
HeapTuple  tup 
)
static

Definition at line 623 of file rewriteheap.c.

References Assert, elog, ereport, errcode(), errmsg(), ERROR, HEAP_DEFAULT_FILLFACTOR, heap_freetuple(), HEAP_INSERT_NO_LOGICAL, HEAP_INSERT_SKIP_FSM, HEAP_INSERT_SKIP_WAL, heap_toast_insert_or_update(), HeapTupleHasExternal, InvalidOffsetNumber, ItemPointerIsValid, ItemPointerSet, log_newpage(), MAIN_FORKNUM, MAXALIGN, MaxHeapTupleSize, PageAddItem, PageGetHeapFreeSpace(), PageGetItem, PageGetItemId, PageInit(), PageSetChecksumInplace(), RelationData::rd_node, RelationData::rd_rel, RelationData::rd_smgr, RelationGetTargetPageFreeSpace, RelationOpenSmgr, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_new_rel, RewriteStateData::rs_use_wal, smgrextend(), HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, HeapTupleData::t_len, HeapTupleData::t_self, and TOAST_TUPLE_THRESHOLD.

Referenced by end_heap_rewrite(), and rewrite_heap_tuple().

624 {
625  Page page = state->rs_buffer;
626  Size pageFreeSpace,
627  saveFreeSpace;
628  Size len;
629  OffsetNumber newoff;
630  HeapTuple heaptup;
631 
632  /*
633  * If the new tuple is too big for storage or contains already toasted
634  * out-of-line attributes from some other relation, invoke the toaster.
635  *
636  * Note: below this point, heaptup is the data we actually intend to store
637  * into the relation; tup is the caller's original untoasted data.
638  */
639  if (state->rs_new_rel->rd_rel->relkind == RELKIND_TOASTVALUE)
640  {
641  /* toast table entries should never be recursively toasted */
643  heaptup = tup;
644  }
645  else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
646  {
648 
649  if (!state->rs_use_wal)
650  options |= HEAP_INSERT_SKIP_WAL;
651 
652  /*
653  * While rewriting the heap for VACUUM FULL / CLUSTER, make sure data
654  * for the TOAST table are not logically decoded. The main heap is
655  * WAL-logged as XLOG FPI records, which are not logically decoded.
656  */
657  options |= HEAP_INSERT_NO_LOGICAL;
658 
659  heaptup = heap_toast_insert_or_update(state->rs_new_rel, tup, NULL,
660  options);
661  }
662  else
663  heaptup = tup;
664 
665  len = MAXALIGN(heaptup->t_len); /* be conservative */
666 
667  /*
668  * If we're gonna fail for oversize tuple, do it right away
669  */
670  if (len > MaxHeapTupleSize)
671  ereport(ERROR,
672  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
673  errmsg("row is too big: size %zu, maximum size %zu",
674  len, MaxHeapTupleSize)));
675 
676  /* Compute desired extra freespace due to fillfactor option */
677  saveFreeSpace = RelationGetTargetPageFreeSpace(state->rs_new_rel,
679 
680  /* Now we can check to see if there's enough free space already. */
681  if (state->rs_buffer_valid)
682  {
683  pageFreeSpace = PageGetHeapFreeSpace(page);
684 
685  if (len + saveFreeSpace > pageFreeSpace)
686  {
687  /* Doesn't fit, so write out the existing page */
688 
689  /* XLOG stuff */
690  if (state->rs_use_wal)
691  log_newpage(&state->rs_new_rel->rd_node,
692  MAIN_FORKNUM,
693  state->rs_blockno,
694  page,
695  true);
696 
697  /*
698  * Now write the page. We say skipFsync = true because there's no
699  * need for smgr to schedule an fsync for this write; we'll do it
700  * ourselves in end_heap_rewrite.
701  */
703 
704  PageSetChecksumInplace(page, state->rs_blockno);
705 
707  state->rs_blockno, (char *) page, true);
708 
709  state->rs_blockno++;
710  state->rs_buffer_valid = false;
711  }
712  }
713 
714  if (!state->rs_buffer_valid)
715  {
716  /* Initialize a new empty page */
717  PageInit(page, BLCKSZ, 0);
718  state->rs_buffer_valid = true;
719  }
720 
721  /* And now we can insert the tuple into the page */
722  newoff = PageAddItem(page, (Item) heaptup->t_data, heaptup->t_len,
723  InvalidOffsetNumber, false, true);
724  if (newoff == InvalidOffsetNumber)
725  elog(ERROR, "failed to add tuple");
726 
727  /* Update caller's t_self to the actual position where it was stored */
728  ItemPointerSet(&(tup->t_self), state->rs_blockno, newoff);
729 
730  /*
731  * Insert the correct position into CTID of the stored tuple, too, if the
732  * caller didn't supply a valid CTID.
733  */
734  if (!ItemPointerIsValid(&tup->t_data->t_ctid))
735  {
736  ItemId newitemid;
737  HeapTupleHeader onpage_tup;
738 
739  newitemid = PageGetItemId(page, newoff);
740  onpage_tup = (HeapTupleHeader) PageGetItem(page, newitemid);
741 
742  onpage_tup->t_ctid = tup->t_self;
743  }
744 
745  /* If heaptup is a private copy, release it. */
746  if (heaptup != tup)
747  heap_freetuple(heaptup);
748 }
#define ItemPointerIsValid(pointer)
Definition: itemptr.h:82
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
struct SMgrRelationData * rd_smgr
Definition: rel.h:56
Pointer Item
Definition: item.h:17
Relation rs_new_rel
Definition: rewriteheap.c:135
int errcode(int sqlerrcode)
Definition: elog.c:608
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:416
#define HEAP_INSERT_SKIP_WAL
Definition: heapam.h:32
Form_pg_class rd_rel
Definition: rel.h:83
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
uint16 OffsetNumber
Definition: off.h:24
HeapTupleHeader t_data
Definition: htup.h:68
#define RelationOpenSmgr(relation)
Definition: rel.h:479
#define ERROR
Definition: elog.h:43
Size PageGetHeapFreeSpace(Page page)
Definition: bufpage.c:665
ItemPointerData t_ctid
Definition: htup_details.h:160
ItemPointerData t_self
Definition: htup.h:65
uint32 t_len
Definition: htup.h:64
#define MaxHeapTupleSize
Definition: htup_details.h:560
#define ereport(elevel, rest)
Definition: elog.h:141
#define RelationGetTargetPageFreeSpace(relation, defaultff)
Definition: rel.h:306
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:235
#define TOAST_TUPLE_THRESHOLD
Definition: heaptoast.h:48
#define InvalidOffsetNumber
Definition: off.h:26
RelFileNode rd_node
Definition: rel.h:54
#define Assert(condition)
Definition: c.h:739
size_t Size
Definition: c.h:467
void PageSetChecksumInplace(Page page, BlockNumber blkno)
Definition: bufpage.c:1198
#define MAXALIGN(LEN)
Definition: c.h:692
void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync)
Definition: smgr.c:483
#define HEAP_INSERT_SKIP_FSM
Definition: heapam.h:33
#define HeapTupleHasExternal(tuple)
Definition: htup_details.h:673
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define HEAP_INSERT_NO_LOGICAL
Definition: heapam.h:35
XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, Page page, bool page_std)
Definition: xloginsert.c:972
#define elog(elevel,...)
Definition: elog.h:228
#define HEAP_DEFAULT_FILLFACTOR
Definition: rel.h:277
HeapTuple heap_toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, int options)
Definition: heaptoast.c:94
BlockNumber rs_blockno
Definition: rewriteheap.c:137
#define PageGetItem(page, itemId)
Definition: bufpage.h:340
Pointer Page
Definition: bufpage.h:78
#define ItemPointerSet(pointer, blockNumber, offNum)
Definition: itemptr.h:127
void PageInit(Page page, Size pageSize, Size specialSize)
Definition: bufpage.c:42

◆ rewrite_heap_dead_tuple()

bool rewrite_heap_dead_tuple ( RewriteState  state,
HeapTuple  old_tuple 
)

Definition at line 573 of file rewriteheap.c.

References Assert, HASH_FIND, HASH_REMOVE, hash_search(), heap_freetuple(), HeapTupleHeaderGetXmin, RewriteStateData::rs_unresolved_tups, HeapTupleData::t_data, HeapTupleData::t_self, TidHashKey::tid, UnresolvedTupData::tuple, and TidHashKey::xmin.

Referenced by heapam_relation_copy_for_cluster().

574 {
575  /*
576  * If we have already seen an earlier tuple in the update chain that
577  * points to this tuple, let's forget about that earlier tuple. It's in
578  * fact dead as well, our simple xmax < OldestXmin test in
579  * HeapTupleSatisfiesVacuum just wasn't enough to detect it. It happens
580  * when xmin of a tuple is greater than xmax, which sounds
581  * counter-intuitive but is perfectly valid.
582  *
583  * We don't bother to try to detect the situation the other way round,
584  * when we encounter the dead tuple first and then the recently dead one
585  * that points to it. If that happens, we'll have some unmatched entries
586  * in the UnresolvedTups hash table at the end. That can happen anyway,
587  * because a vacuum might have removed the dead tuple in the chain before
588  * us.
589  */
590  UnresolvedTup unresolved;
591  TidHashKey hashkey;
592  bool found;
593 
594  memset(&hashkey, 0, sizeof(hashkey));
595  hashkey.xmin = HeapTupleHeaderGetXmin(old_tuple->t_data);
596  hashkey.tid = old_tuple->t_self;
597 
598  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
599  HASH_FIND, NULL);
600 
601  if (unresolved != NULL)
602  {
603  /* Need to free the contained tuple as well as the hashtable entry */
604  heap_freetuple(unresolved->tuple);
605  hash_search(state->rs_unresolved_tups, &hashkey,
606  HASH_REMOVE, &found);
607  Assert(found);
608  return true;
609  }
610 
611  return false;
612 }
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
HeapTupleHeader t_data
Definition: htup.h:68
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:152
ItemPointerData t_self
Definition: htup.h:65
ItemPointerData tid
Definition: rewriteheap.c:167
#define Assert(condition)
Definition: c.h:739
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:313
TransactionId xmin
Definition: rewriteheap.c:166

◆ rewrite_heap_tuple()

void rewrite_heap_tuple ( RewriteState  state,
HeapTuple  old_tuple,
HeapTuple  new_tuple 
)

Definition at line 371 of file rewriteheap.c.

References Assert, HASH_ENTER, HASH_FIND, HASH_REMOVE, hash_search(), HEAP2_XACT_MASK, heap_copytuple(), heap_freetuple(), heap_freeze_tuple(), HEAP_UPDATED, HEAP_XACT_MASK, HEAP_XMAX_INVALID, HeapTupleHeaderGetUpdateXid, HeapTupleHeaderGetXmin, HeapTupleHeaderIndicatesMovedPartitions, HeapTupleHeaderIsOnlyLocked(), ItemPointerEquals(), ItemPointerSetInvalid, logical_rewrite_heap_tuple(), MemoryContextSwitchTo(), OldToNewMappingData::new_tid, UnresolvedTupData::old_tid, raw_heap_insert(), RelationData::rd_rel, RewriteStateData::rs_cutoff_multi, RewriteStateData::rs_cxt, RewriteStateData::rs_freeze_xid, RewriteStateData::rs_old_new_tid_map, RewriteStateData::rs_old_rel, RewriteStateData::rs_oldest_xmin, RewriteStateData::rs_unresolved_tups, HeapTupleHeaderData::t_choice, HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, HeapTupleHeaderData::t_heap, HeapTupleHeaderData::t_infomask, HeapTupleHeaderData::t_infomask2, HeapTupleData::t_self, TidHashKey::tid, TransactionIdPrecedes(), UnresolvedTupData::tuple, and TidHashKey::xmin.

Referenced by reform_and_rewrite_tuple().

373 {
374  MemoryContext old_cxt;
375  ItemPointerData old_tid;
376  TidHashKey hashkey;
377  bool found;
378  bool free_new;
379 
380  old_cxt = MemoryContextSwitchTo(state->rs_cxt);
381 
382  /*
383  * Copy the original tuple's visibility information into new_tuple.
384  *
385  * XXX we might later need to copy some t_infomask2 bits, too? Right now,
386  * we intentionally clear the HOT status bits.
387  */
388  memcpy(&new_tuple->t_data->t_choice.t_heap,
389  &old_tuple->t_data->t_choice.t_heap,
390  sizeof(HeapTupleFields));
391 
392  new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
393  new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
394  new_tuple->t_data->t_infomask |=
395  old_tuple->t_data->t_infomask & HEAP_XACT_MASK;
396 
397  /*
398  * While we have our hands on the tuple, we may as well freeze any
399  * eligible xmin or xmax, so that future VACUUM effort can be saved.
400  */
401  heap_freeze_tuple(new_tuple->t_data,
402  state->rs_old_rel->rd_rel->relfrozenxid,
403  state->rs_old_rel->rd_rel->relminmxid,
404  state->rs_freeze_xid,
405  state->rs_cutoff_multi);
406 
407  /*
408  * Invalid ctid means that ctid should point to the tuple itself. We'll
409  * override it later if the tuple is part of an update chain.
410  */
411  ItemPointerSetInvalid(&new_tuple->t_data->t_ctid);
412 
413  /*
414  * If the tuple has been updated, check the old-to-new mapping hash table.
415  */
416  if (!((old_tuple->t_data->t_infomask & HEAP_XMAX_INVALID) ||
417  HeapTupleHeaderIsOnlyLocked(old_tuple->t_data)) &&
419  !(ItemPointerEquals(&(old_tuple->t_self),
420  &(old_tuple->t_data->t_ctid))))
421  {
422  OldToNewMapping mapping;
423 
424  memset(&hashkey, 0, sizeof(hashkey));
425  hashkey.xmin = HeapTupleHeaderGetUpdateXid(old_tuple->t_data);
426  hashkey.tid = old_tuple->t_data->t_ctid;
427 
428  mapping = (OldToNewMapping)
429  hash_search(state->rs_old_new_tid_map, &hashkey,
430  HASH_FIND, NULL);
431 
432  if (mapping != NULL)
433  {
434  /*
435  * We've already copied the tuple that t_ctid points to, so we can
436  * set the ctid of this tuple to point to the new location, and
437  * insert it right away.
438  */
439  new_tuple->t_data->t_ctid = mapping->new_tid;
440 
441  /* We don't need the mapping entry anymore */
442  hash_search(state->rs_old_new_tid_map, &hashkey,
443  HASH_REMOVE, &found);
444  Assert(found);
445  }
446  else
447  {
448  /*
449  * We haven't seen the tuple t_ctid points to yet. Stash this
450  * tuple into unresolved_tups to be written later.
451  */
452  UnresolvedTup unresolved;
453 
454  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
455  HASH_ENTER, &found);
456  Assert(!found);
457 
458  unresolved->old_tid = old_tuple->t_self;
459  unresolved->tuple = heap_copytuple(new_tuple);
460 
461  /*
462  * We can't do anything more now, since we don't know where the
463  * tuple will be written.
464  */
465  MemoryContextSwitchTo(old_cxt);
466  return;
467  }
468  }
469 
470  /*
471  * Now we will write the tuple, and then check to see if it is the B tuple
472  * in any new or known pair. When we resolve a known pair, we will be
473  * able to write that pair's A tuple, and then we have to check if it
474  * resolves some other pair. Hence, we need a loop here.
475  */
476  old_tid = old_tuple->t_self;
477  free_new = false;
478 
479  for (;;)
480  {
481  ItemPointerData new_tid;
482 
483  /* Insert the tuple and find out where it's put in new_heap */
484  raw_heap_insert(state, new_tuple);
485  new_tid = new_tuple->t_self;
486 
487  logical_rewrite_heap_tuple(state, old_tid, new_tuple);
488 
489  /*
490  * If the tuple is the updated version of a row, and the prior version
491  * wouldn't be DEAD yet, then we need to either resolve the prior
492  * version (if it's waiting in rs_unresolved_tups), or make an entry
493  * in rs_old_new_tid_map (so we can resolve it when we do see it). The
494  * previous tuple's xmax would equal this one's xmin, so it's
495  * RECENTLY_DEAD if and only if the xmin is not before OldestXmin.
496  */
497  if ((new_tuple->t_data->t_infomask & HEAP_UPDATED) &&
499  state->rs_oldest_xmin))
500  {
501  /*
502  * Okay, this is B in an update pair. See if we've seen A.
503  */
504  UnresolvedTup unresolved;
505 
506  memset(&hashkey, 0, sizeof(hashkey));
507  hashkey.xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
508  hashkey.tid = old_tid;
509 
510  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
511  HASH_FIND, NULL);
512 
513  if (unresolved != NULL)
514  {
515  /*
516  * We have seen and memorized the previous tuple already. Now
517  * that we know where we inserted the tuple its t_ctid points
518  * to, fix its t_ctid and insert it to the new heap.
519  */
520  if (free_new)
521  heap_freetuple(new_tuple);
522  new_tuple = unresolved->tuple;
523  free_new = true;
524  old_tid = unresolved->old_tid;
525  new_tuple->t_data->t_ctid = new_tid;
526 
527  /*
528  * We don't need the hash entry anymore, but don't free its
529  * tuple just yet.
530  */
531  hash_search(state->rs_unresolved_tups, &hashkey,
532  HASH_REMOVE, &found);
533  Assert(found);
534 
535  /* loop back to insert the previous tuple in the chain */
536  continue;
537  }
538  else
539  {
540  /*
541  * Remember the new tid of this tuple. We'll use it to set the
542  * ctid when we find the previous tuple in the chain.
543  */
544  OldToNewMapping mapping;
545 
546  mapping = hash_search(state->rs_old_new_tid_map, &hashkey,
547  HASH_ENTER, &found);
548  Assert(!found);
549 
550  mapping->new_tid = new_tid;
551  }
552  }
553 
554  /* Done with this (chain of) tuples, for now */
555  if (free_new)
556  heap_freetuple(new_tuple);
557  break;
558  }
559 
560  MemoryContextSwitchTo(old_cxt);
561 }
#define HeapTupleHeaderGetUpdateXid(tup)
Definition: htup_details.h:365
HeapTuple heap_copytuple(HeapTuple tuple)
Definition: heaptuple.c:680
union HeapTupleHeaderData::@45 t_choice
HeapTupleFields t_heap
Definition: htup_details.h:156
static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
Definition: rewriteheap.c:1049
TransactionId rs_freeze_xid
Definition: rewriteheap.c:143
bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple)
MultiXactId rs_cutoff_multi
Definition: rewriteheap.c:147
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define HEAP2_XACT_MASK
Definition: htup_details.h:283
#define HeapTupleHeaderIndicatesMovedPartitions(tup)
Definition: htup_details.h:445
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
#define HEAP_UPDATED
Definition: htup_details.h:209
ItemPointerData old_tid
Definition: rewriteheap.c:176
Form_pg_class rd_rel
Definition: rel.h:83
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
HeapTupleHeader t_data
Definition: htup.h:68
ItemPointerData new_tid
Definition: rewriteheap.c:185
#define HEAP_XMAX_INVALID
Definition: htup_details.h:207
ItemPointerData t_ctid
Definition: htup_details.h:160
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:152
ItemPointerData t_self
Definition: htup.h:65
bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId relfrozenxid, TransactionId relminmxid, TransactionId cutoff_xid, TransactionId cutoff_multi)
Definition: heapam.c:6368
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
TransactionId rs_oldest_xmin
Definition: rewriteheap.c:141
MemoryContext rs_cxt
Definition: rewriteheap.c:149
ItemPointerData tid
Definition: rewriteheap.c:167
#define Assert(condition)
Definition: c.h:739
HTAB * rs_old_new_tid_map
Definition: rewriteheap.c:153
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:313
bool ItemPointerEquals(ItemPointer pointer1, ItemPointer pointer2)
Definition: itemptr.c:29
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:172
TransactionId xmin
Definition: rewriteheap.c:166
static void raw_heap_insert(RewriteState state, HeapTuple tup)
Definition: rewriteheap.c:623
#define HEAP_XACT_MASK
Definition: htup_details.h:218
Relation rs_old_rel
Definition: rewriteheap.c:134
OldToNewMappingData * OldToNewMapping
Definition: rewriteheap.c:188