PostgreSQL Source Code  git master
rewriteheap.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <unistd.h>
#include "miscadmin.h"
#include "access/heapam.h"
#include "access/heapam_xlog.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/tuptoaster.h"
#include "access/xact.h"
#include "access/xloginsert.h"
#include "catalog/catalog.h"
#include "lib/ilist.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/slot.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/smgr.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "storage/procarray.h"
Include dependency graph for rewriteheap.c:

Go to the source code of this file.

Data Structures

struct  RewriteStateData
 
struct  TidHashKey
 
struct  UnresolvedTupData
 
struct  OldToNewMappingData
 
struct  RewriteMappingFile
 
struct  RewriteMappingDataEntry
 

Typedefs

typedef struct RewriteStateData RewriteStateData
 
typedef UnresolvedTupDataUnresolvedTup
 
typedef OldToNewMappingDataOldToNewMapping
 
typedef struct RewriteMappingFile RewriteMappingFile
 
typedef struct RewriteMappingDataEntry RewriteMappingDataEntry
 

Functions

static void raw_heap_insert (RewriteState state, HeapTuple tup)
 
static void logical_begin_heap_rewrite (RewriteState state)
 
static void logical_rewrite_heap_tuple (RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
 
static void logical_end_heap_rewrite (RewriteState state)
 
RewriteState begin_heap_rewrite (Relation old_heap, Relation new_heap, TransactionId oldest_xmin, TransactionId freeze_xid, MultiXactId cutoff_multi, bool use_wal)
 
void end_heap_rewrite (RewriteState state)
 
void rewrite_heap_tuple (RewriteState state, HeapTuple old_tuple, HeapTuple new_tuple)
 
bool rewrite_heap_dead_tuple (RewriteState state, HeapTuple old_tuple)
 
static void logical_heap_rewrite_flush_mappings (RewriteState state)
 
static void logical_rewrite_log_mapping (RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
 
void heap_xlog_logical_rewrite (XLogReaderState *r)
 
void CheckPointLogicalRewriteHeap (void)
 

Typedef Documentation

◆ OldToNewMapping

◆ RewriteMappingDataEntry

◆ RewriteMappingFile

◆ RewriteStateData

◆ UnresolvedTup

Function Documentation

◆ begin_heap_rewrite()

RewriteState begin_heap_rewrite ( Relation  old_heap,
Relation  new_heap,
TransactionId  oldest_xmin,
TransactionId  freeze_xid,
MultiXactId  cutoff_multi,
bool  use_wal 
)

Definition at line 247 of file rewriteheap.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CurrentMemoryContext, HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, logical_begin_heap_rewrite(), MemoryContextSwitchTo(), palloc(), palloc0(), RelationGetNumberOfBlocks, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_cutoff_multi, RewriteStateData::rs_cxt, RewriteStateData::rs_freeze_xid, RewriteStateData::rs_new_rel, RewriteStateData::rs_old_new_tid_map, RewriteStateData::rs_old_rel, RewriteStateData::rs_oldest_xmin, RewriteStateData::rs_unresolved_tups, and RewriteStateData::rs_use_wal.

Referenced by heapam_relation_copy_for_cluster().

250 {
252  MemoryContext rw_cxt;
253  MemoryContext old_cxt;
254  HASHCTL hash_ctl;
255 
256  /*
257  * To ease cleanup, make a separate context that will contain the
258  * RewriteState struct itself plus all subsidiary data.
259  */
261  "Table rewrite",
263  old_cxt = MemoryContextSwitchTo(rw_cxt);
264 
265  /* Create and fill in the state struct */
266  state = palloc0(sizeof(RewriteStateData));
267 
268  state->rs_old_rel = old_heap;
269  state->rs_new_rel = new_heap;
270  state->rs_buffer = (Page) palloc(BLCKSZ);
271  /* new_heap needn't be empty, just locked */
272  state->rs_blockno = RelationGetNumberOfBlocks(new_heap);
273  state->rs_buffer_valid = false;
274  state->rs_use_wal = use_wal;
275  state->rs_oldest_xmin = oldest_xmin;
276  state->rs_freeze_xid = freeze_xid;
277  state->rs_cutoff_multi = cutoff_multi;
278  state->rs_cxt = rw_cxt;
279 
280  /* Initialize hash tables used to track update chains */
281  memset(&hash_ctl, 0, sizeof(hash_ctl));
282  hash_ctl.keysize = sizeof(TidHashKey);
283  hash_ctl.entrysize = sizeof(UnresolvedTupData);
284  hash_ctl.hcxt = state->rs_cxt;
285 
286  state->rs_unresolved_tups =
287  hash_create("Rewrite / Unresolved ctids",
288  128, /* arbitrary initial size */
289  &hash_ctl,
291 
292  hash_ctl.entrysize = sizeof(OldToNewMappingData);
293 
294  state->rs_old_new_tid_map =
295  hash_create("Rewrite / Old to new tid map",
296  128, /* arbitrary initial size */
297  &hash_ctl,
299 
300  MemoryContextSwitchTo(old_cxt);
301 
303 
304  return state;
305 }
#define AllocSetContextCreate
Definition: memutils.h:169
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
TransactionId rs_freeze_xid
Definition: rewriteheap.c:151
MultiXactId rs_cutoff_multi
Definition: rewriteheap.c:155
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Size entrysize
Definition: hsearch.h:73
Relation rs_new_rel
Definition: rewriteheap.c:143
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:160
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:191
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
TransactionId rs_oldest_xmin
Definition: rewriteheap.c:149
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext rs_cxt
Definition: rewriteheap.c:157
void * palloc0(Size size)
Definition: mcxt.c:955
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
Size keysize
Definition: hsearch.h:72
#define RelationGetNumberOfBlocks(reln)
Definition: bufmgr.h:198
static void logical_begin_heap_rewrite(RewriteState state)
Definition: rewriteheap.c:814
HTAB * rs_old_new_tid_map
Definition: rewriteheap.c:161
Definition: regguts.h:298
void * palloc(Size size)
Definition: mcxt.c:924
Relation rs_old_rel
Definition: rewriteheap.c:142
BlockNumber rs_blockno
Definition: rewriteheap.c:145
Pointer Page
Definition: bufpage.h:78

◆ CheckPointLogicalRewriteHeap()

void CheckPointLogicalRewriteHeap ( void  )

Definition at line 1222 of file rewriteheap.c.

References AllocateDir(), CloseTransientFile(), dirent::d_name, data_sync_elevel(), DEBUG1, elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FreeDir(), GetRedoRecPtr(), InvalidXLogRecPtr, LOGICAL_REWRITE_FORMAT, lstat, MAXPGPATH, OpenTransientFile(), PG_BINARY, pg_fsync(), pgstat_report_wait_end(), pgstat_report_wait_start(), ReadDir(), ReplicationSlotsComputeLogicalRestartLSN(), S_ISREG, snprintf, stat, and WAIT_EVENT_LOGICAL_REWRITE_CHECKPOINT_SYNC.

Referenced by CheckPointGuts().

1223 {
1224  XLogRecPtr cutoff;
1225  XLogRecPtr redo;
1226  DIR *mappings_dir;
1227  struct dirent *mapping_de;
1228  char path[MAXPGPATH + 20];
1229 
1230  /*
1231  * We start of with a minimum of the last redo pointer. No new decoding
1232  * slot will start before that, so that's a safe upper bound for removal.
1233  */
1234  redo = GetRedoRecPtr();
1235 
1236  /* now check for the restart ptrs from existing slots */
1238 
1239  /* don't start earlier than the restart lsn */
1240  if (cutoff != InvalidXLogRecPtr && redo < cutoff)
1241  cutoff = redo;
1242 
1243  mappings_dir = AllocateDir("pg_logical/mappings");
1244  while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
1245  {
1246  struct stat statbuf;
1247  Oid dboid;
1248  Oid relid;
1249  XLogRecPtr lsn;
1250  TransactionId rewrite_xid;
1251  TransactionId create_xid;
1252  uint32 hi,
1253  lo;
1254 
1255  if (strcmp(mapping_de->d_name, ".") == 0 ||
1256  strcmp(mapping_de->d_name, "..") == 0)
1257  continue;
1258 
1259  snprintf(path, sizeof(path), "pg_logical/mappings/%s", mapping_de->d_name);
1260  if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
1261  continue;
1262 
1263  /* Skip over files that cannot be ours. */
1264  if (strncmp(mapping_de->d_name, "map-", 4) != 0)
1265  continue;
1266 
1267  if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
1268  &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
1269  elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
1270 
1271  lsn = ((uint64) hi) << 32 | lo;
1272 
1273  if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
1274  {
1275  elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
1276  if (unlink(path) < 0)
1277  ereport(ERROR,
1279  errmsg("could not remove file \"%s\": %m", path)));
1280  }
1281  else
1282  {
1283  int fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1284 
1285  /*
1286  * The file cannot vanish due to concurrency since this function
1287  * is the only one removing logical mappings and it's run while
1288  * CheckpointLock is held exclusively.
1289  */
1290  if (fd < 0)
1291  ereport(ERROR,
1293  errmsg("could not open file \"%s\": %m", path)));
1294 
1295  /*
1296  * We could try to avoid fsyncing files that either haven't
1297  * changed or have only been created since the checkpoint's start,
1298  * but it's currently not deemed worth the effort.
1299  */
1301  if (pg_fsync(fd) != 0)
1304  errmsg("could not fsync file \"%s\": %m", path)));
1306 
1307  if (CloseTransientFile(fd) != 0)
1308  ereport(ERROR,
1310  errmsg("could not close file \"%s\": %m", path)));
1311  }
1312  }
1313  FreeDir(mappings_dir);
1314 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define DEBUG1
Definition: elog.h:25
uint32 TransactionId
Definition: c.h:507
unsigned int Oid
Definition: postgres_ext.h:31
Definition: dirent.h:9
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1191
Definition: dirent.c:25
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2257
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:593
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition: slot.c:791
unsigned int uint32
Definition: c.h:358
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2468
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1342
#define ereport(elevel, rest)
Definition: elog.h:141
#define S_ISREG(m)
Definition: win32_port.h:308
int CloseTransientFile(int fd)
Definition: fd.c:2434
#define stat(a, b)
Definition: win32_port.h:264
int data_sync_elevel(int elevel)
Definition: fd.c:3485
uint64 XLogRecPtr
Definition: xlogdefs.h:21
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2534
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1318
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:8171
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
#define lstat(path, sb)
Definition: win32_port.h:253
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define elog(elevel,...)
Definition: elog.h:226
int pg_fsync(int fd)
Definition: fd.c:333
char d_name[MAX_PATH]
Definition: dirent.h:14
#define snprintf
Definition: port.h:192
int FreeDir(DIR *dir)
Definition: fd.c:2586

◆ end_heap_rewrite()

void end_heap_rewrite ( RewriteState  state)

Definition at line 313 of file rewriteheap.c.

References hash_seq_init(), hash_seq_search(), heap_sync(), ItemPointerSetInvalid, log_newpage(), logical_end_heap_rewrite(), MAIN_FORKNUM, MemoryContextDelete(), PageSetChecksumInplace(), raw_heap_insert(), RelationData::rd_node, RelationData::rd_smgr, RelationNeedsWAL, RelationOpenSmgr, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_cxt, RewriteStateData::rs_new_rel, RewriteStateData::rs_unresolved_tups, RewriteStateData::rs_use_wal, smgrextend(), HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, and UnresolvedTupData::tuple.

Referenced by heapam_relation_copy_for_cluster().

314 {
315  HASH_SEQ_STATUS seq_status;
316  UnresolvedTup unresolved;
317 
318  /*
319  * Write any remaining tuples in the UnresolvedTups table. If we have any
320  * left, they should in fact be dead, but let's err on the safe side.
321  */
322  hash_seq_init(&seq_status, state->rs_unresolved_tups);
323 
324  while ((unresolved = hash_seq_search(&seq_status)) != NULL)
325  {
326  ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
327  raw_heap_insert(state, unresolved->tuple);
328  }
329 
330  /* Write the last page, if any */
331  if (state->rs_buffer_valid)
332  {
333  if (state->rs_use_wal)
334  log_newpage(&state->rs_new_rel->rd_node,
335  MAIN_FORKNUM,
336  state->rs_blockno,
337  state->rs_buffer,
338  true);
340 
342 
344  (char *) state->rs_buffer, true);
345  }
346 
347  /*
348  * If the rel is WAL-logged, must fsync before commit. We use heap_sync
349  * to ensure that the toast table gets fsync'd too.
350  *
351  * It's obvious that we must do this when not WAL-logging. It's less
352  * obvious that we have to do it even if we did WAL-log the pages. The
353  * reason is the same as in storage.c's RelationCopyStorage(): we're
354  * writing data that's not in shared buffers, and so a CHECKPOINT
355  * occurring during the rewriteheap operation won't have fsync'd data we
356  * wrote before the checkpoint.
357  */
358  if (RelationNeedsWAL(state->rs_new_rel))
359  heap_sync(state->rs_new_rel);
360 
362 
363  /* Deleting the context frees everything */
364  MemoryContextDelete(state->rs_cxt);
365 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
static void logical_end_heap_rewrite(RewriteState state)
Definition: rewriteheap.c:961
struct SMgrRelationData * rd_smgr
Definition: rel.h:56
Relation rs_new_rel
Definition: rewriteheap.c:143
void heap_sync(Relation rel)
Definition: heapam.c:8935
HeapTupleHeader t_data
Definition: htup.h:68
#define RelationOpenSmgr(relation)
Definition: rel.h:473
ItemPointerData t_ctid
Definition: htup_details.h:160
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:160
MemoryContext rs_cxt
Definition: rewriteheap.c:157
RelFileNode rd_node
Definition: rel.h:54
void PageSetChecksumInplace(Page page, BlockNumber blkno)
Definition: bufpage.c:1198
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1389
#define RelationNeedsWAL(relation)
Definition: rel.h:518
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1379
void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync)
Definition: smgr.c:537
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:172
XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, Page page, bool page_std)
Definition: xloginsert.c:972
static void raw_heap_insert(RewriteState state, HeapTuple tup)
Definition: rewriteheap.c:631
BlockNumber rs_blockno
Definition: rewriteheap.c:145

◆ heap_xlog_logical_rewrite()

void heap_xlog_logical_rewrite ( XLogReaderState r)

Definition at line 1132 of file rewriteheap.c.

References CloseTransientFile(), data_sync_elevel(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ftruncate, LOGICAL_REWRITE_FORMAT, xl_heap_rewrite_mapping::mapped_db, xl_heap_rewrite_mapping::mapped_rel, xl_heap_rewrite_mapping::mapped_xid, MAXPGPATH, xl_heap_rewrite_mapping::num_mappings, xl_heap_rewrite_mapping::offset, OpenTransientFile(), PG_BINARY, pg_fsync(), pgstat_report_wait_end(), pgstat_report_wait_start(), snprintf, xl_heap_rewrite_mapping::start_lsn, WAIT_EVENT_LOGICAL_REWRITE_MAPPING_SYNC, WAIT_EVENT_LOGICAL_REWRITE_MAPPING_WRITE, WAIT_EVENT_LOGICAL_REWRITE_TRUNCATE, write, XLogRecGetData, and XLogRecGetXid.

Referenced by heap2_redo().

1133 {
1134  char path[MAXPGPATH];
1135  int fd;
1136  xl_heap_rewrite_mapping *xlrec;
1137  uint32 len;
1138  char *data;
1139 
1140  xlrec = (xl_heap_rewrite_mapping *) XLogRecGetData(r);
1141 
1142  snprintf(path, MAXPGPATH,
1143  "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
1144  xlrec->mapped_db, xlrec->mapped_rel,
1145  (uint32) (xlrec->start_lsn >> 32),
1146  (uint32) xlrec->start_lsn,
1147  xlrec->mapped_xid, XLogRecGetXid(r));
1148 
1149  fd = OpenTransientFile(path,
1150  O_CREAT | O_WRONLY | PG_BINARY);
1151  if (fd < 0)
1152  ereport(ERROR,
1154  errmsg("could not create file \"%s\": %m", path)));
1155 
1156  /*
1157  * Truncate all data that's not guaranteed to have been safely fsynced (by
1158  * previous record or by the last checkpoint).
1159  */
1161  if (ftruncate(fd, xlrec->offset) != 0)
1162  ereport(ERROR,
1164  errmsg("could not truncate file \"%s\" to %u: %m",
1165  path, (uint32) xlrec->offset)));
1167 
1168  /* now seek to the position we want to write our data to */
1169  if (lseek(fd, xlrec->offset, SEEK_SET) != xlrec->offset)
1170  ereport(ERROR,
1172  errmsg("could not seek to end of file \"%s\": %m",
1173  path)));
1174 
1175  data = XLogRecGetData(r) + sizeof(*xlrec);
1176 
1177  len = xlrec->num_mappings * sizeof(LogicalRewriteMappingData);
1178 
1179  /* write out tail end of mapping file (again) */
1180  errno = 0;
1182  if (write(fd, data, len) != len)
1183  {
1184  /* if write didn't set errno, assume problem is no disk space */
1185  if (errno == 0)
1186  errno = ENOSPC;
1187  ereport(ERROR,
1189  errmsg("could not write to file \"%s\": %m", path)));
1190  }
1192 
1193  /*
1194  * Now fsync all previously written data. We could improve things and only
1195  * do this for the last write to a file, but the required bookkeeping
1196  * doesn't seem worth the trouble.
1197  */
1199  if (pg_fsync(fd) != 0)
1202  errmsg("could not fsync file \"%s\": %m", path)));
1204 
1205  if (CloseTransientFile(fd) != 0)
1206  ereport(ERROR,
1208  errmsg("could not close file \"%s\": %m", path)));
1209 }
#define write(a, b, c)
Definition: win32.h:14
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1191
#define XLogRecGetData(decoder)
Definition: xlogreader.h:237
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2257
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:593
unsigned int uint32
Definition: c.h:358
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1342
#define ereport(elevel, rest)
Definition: elog.h:141
int CloseTransientFile(int fd)
Definition: fd.c:2434
#define XLogRecGetXid(decoder)
Definition: xlogreader.h:235
int data_sync_elevel(int elevel)
Definition: fd.c:3485
TransactionId mapped_xid
Definition: heapam_xlog.h:378
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1318
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
int errmsg(const char *fmt,...)
Definition: elog.c:784
struct LogicalRewriteMappingData LogicalRewriteMappingData
int pg_fsync(int fd)
Definition: fd.c:333
#define snprintf
Definition: port.h:192
#define ftruncate(a, b)
Definition: win32_port.h:60

◆ logical_begin_heap_rewrite()

static void logical_begin_heap_rewrite ( RewriteState  state)
static

Definition at line 814 of file rewriteheap.c.

References HASHCTL::entrysize, GetXLogInsertRecPtr(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, HASHCTL::keysize, ProcArrayGetReplicationSlotXmin(), RelationIsAccessibleInLogicalDecoding, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_cxt, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_logical_xmin, RewriteStateData::rs_num_rewrite_mappings, and RewriteStateData::rs_old_rel.

Referenced by begin_heap_rewrite().

815 {
816  HASHCTL hash_ctl;
817  TransactionId logical_xmin;
818 
819  /*
820  * We only need to persist these mappings if the rewritten table can be
821  * accessed during logical decoding, if not, we can skip doing any
822  * additional work.
823  */
824  state->rs_logical_rewrite =
826 
827  if (!state->rs_logical_rewrite)
828  return;
829 
830  ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin);
831 
832  /*
833  * If there are no logical slots in progress we don't need to do anything,
834  * there cannot be any remappings for relevant rows yet. The relation's
835  * lock protects us against races.
836  */
837  if (logical_xmin == InvalidTransactionId)
838  {
839  state->rs_logical_rewrite = false;
840  return;
841  }
842 
843  state->rs_logical_xmin = logical_xmin;
845  state->rs_num_rewrite_mappings = 0;
846 
847  memset(&hash_ctl, 0, sizeof(hash_ctl));
848  hash_ctl.keysize = sizeof(TransactionId);
849  hash_ctl.entrysize = sizeof(RewriteMappingFile);
850  hash_ctl.hcxt = state->rs_cxt;
851 
852  state->rs_logical_mappings =
853  hash_create("Logical rewrite mapping",
854  128, /* arbitrary initial size */
855  &hash_ctl,
857 }
TransactionId rs_logical_xmin
Definition: rewriteheap.c:153
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
uint32 TransactionId
Definition: c.h:507
MemoryContext hcxt
Definition: hsearch.h:78
Size entrysize
Definition: hsearch.h:73
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:3003
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:163
XLogRecPtr GetXLogInsertRecPtr(void)
Definition: xlog.c:11149
#define InvalidTransactionId
Definition: transam.h:31
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:159
HTAB * rs_logical_mappings
Definition: rewriteheap.c:162
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext rs_cxt
Definition: rewriteheap.c:157
#define RelationIsAccessibleInLogicalDecoding(relation)
Definition: rel.h:572
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
Size keysize
Definition: hsearch.h:72
struct RewriteMappingFile RewriteMappingFile
Relation rs_old_rel
Definition: rewriteheap.c:142

◆ logical_end_heap_rewrite()

static void logical_end_heap_rewrite ( RewriteState  state)
static

Definition at line 961 of file rewriteheap.c.

References data_sync_elevel(), ereport, errcode_for_file_access(), errmsg(), ERROR, FileClose(), FileSync(), hash_seq_init(), hash_seq_search(), logical_heap_rewrite_flush_mappings(), RewriteMappingFile::path, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_num_rewrite_mappings, RewriteMappingFile::vfd, and WAIT_EVENT_LOGICAL_REWRITE_SYNC.

Referenced by end_heap_rewrite().

962 {
963  HASH_SEQ_STATUS seq_status;
964  RewriteMappingFile *src;
965 
966  /* done, no logical rewrite in progress */
967  if (!state->rs_logical_rewrite)
968  return;
969 
970  /* writeout remaining in-memory entries */
971  if (state->rs_num_rewrite_mappings > 0)
973 
974  /* Iterate over all mappings we have written and fsync the files. */
975  hash_seq_init(&seq_status, state->rs_logical_mappings);
976  while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
977  {
981  errmsg("could not fsync file \"%s\": %m", src->path)));
982  FileClose(src->vfd);
983  }
984  /* memory context cleanup will deal with the rest */
985 }
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:163
#define ERROR
Definition: elog.h:43
char path[MAXPGPATH]
Definition: rewriteheap.c:209
int FileSync(File file, uint32 wait_event_info)
Definition: fd.c:2014
int errcode_for_file_access(void)
Definition: elog.c:593
#define ereport(elevel, rest)
Definition: elog.h:141
HTAB * rs_logical_mappings
Definition: rewriteheap.c:162
int data_sync_elevel(int elevel)
Definition: fd.c:3485
void FileClose(File file)
Definition: fd.c:1713
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1389
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1379
static void logical_heap_rewrite_flush_mappings(RewriteState state)
Definition: rewriteheap.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:784

◆ logical_heap_rewrite_flush_mappings()

static void logical_heap_rewrite_flush_mappings ( RewriteState  state)
static

Definition at line 863 of file rewriteheap.c.

References Assert, dlist_mutable_iter::cur, DEBUG1, dlist_container, dlist_delete(), dlist_foreach_modify, elog, ereport, errcode_for_file_access(), errmsg(), ERROR, FileWrite(), hash_seq_init(), hash_seq_search(), InvalidOid, RewriteMappingDataEntry::map, xl_heap_rewrite_mapping::mapped_db, xl_heap_rewrite_mapping::mapped_rel, xl_heap_rewrite_mapping::mapped_xid, RewriteMappingFile::mappings, MyDatabaseId, RewriteMappingDataEntry::node, RewriteMappingFile::num_mappings, xl_heap_rewrite_mapping::num_mappings, RewriteMappingFile::off, xl_heap_rewrite_mapping::offset, palloc(), RewriteMappingFile::path, pfree(), RelationData::rd_rel, RelationGetRelid, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_num_rewrite_mappings, RewriteStateData::rs_old_rel, xl_heap_rewrite_mapping::start_lsn, RewriteMappingFile::vfd, WAIT_EVENT_LOGICAL_REWRITE_WRITE, RewriteMappingFile::xid, XLOG_HEAP2_REWRITE, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by logical_end_heap_rewrite(), and logical_rewrite_log_mapping().

864 {
865  HASH_SEQ_STATUS seq_status;
866  RewriteMappingFile *src;
867  dlist_mutable_iter iter;
868 
869  Assert(state->rs_logical_rewrite);
870 
871  /* no logical rewrite in progress, no need to iterate over mappings */
872  if (state->rs_num_rewrite_mappings == 0)
873  return;
874 
875  elog(DEBUG1, "flushing %u logical rewrite mapping entries",
876  state->rs_num_rewrite_mappings);
877 
878  hash_seq_init(&seq_status, state->rs_logical_mappings);
879  while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
880  {
881  char *waldata;
882  char *waldata_start;
884  Oid dboid;
885  uint32 len;
886  int written;
887 
888  /* this file hasn't got any new mappings */
889  if (src->num_mappings == 0)
890  continue;
891 
892  if (state->rs_old_rel->rd_rel->relisshared)
893  dboid = InvalidOid;
894  else
895  dboid = MyDatabaseId;
896 
897  xlrec.num_mappings = src->num_mappings;
898  xlrec.mapped_rel = RelationGetRelid(state->rs_old_rel);
899  xlrec.mapped_xid = src->xid;
900  xlrec.mapped_db = dboid;
901  xlrec.offset = src->off;
902  xlrec.start_lsn = state->rs_begin_lsn;
903 
904  /* write all mappings consecutively */
905  len = src->num_mappings * sizeof(LogicalRewriteMappingData);
906  waldata_start = waldata = palloc(len);
907 
908  /*
909  * collect data we need to write out, but don't modify ondisk data yet
910  */
911  dlist_foreach_modify(iter, &src->mappings)
912  {
914 
915  pmap = dlist_container(RewriteMappingDataEntry, node, iter.cur);
916 
917  memcpy(waldata, &pmap->map, sizeof(pmap->map));
918  waldata += sizeof(pmap->map);
919 
920  /* remove from the list and free */
921  dlist_delete(&pmap->node);
922  pfree(pmap);
923 
924  /* update bookkeeping */
925  state->rs_num_rewrite_mappings--;
926  src->num_mappings--;
927  }
928 
929  Assert(src->num_mappings == 0);
930  Assert(waldata == waldata_start + len);
931 
932  /*
933  * Note that we deviate from the usual WAL coding practices here,
934  * check the above "Logical rewrite support" comment for reasoning.
935  */
936  written = FileWrite(src->vfd, waldata_start, len, src->off,
938  if (written != len)
939  ereport(ERROR,
941  errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path,
942  written, len)));
943  src->off += len;
944 
945  XLogBeginInsert();
946  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
947  XLogRegisterData(waldata_start, len);
948 
949  /* write xlog record */
950  XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_REWRITE);
951 
952  pfree(waldata_start);
953  }
954  Assert(state->rs_num_rewrite_mappings == 0);
955 }
#define DEBUG1
Definition: elog.h:25
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define XLOG_HEAP2_REWRITE
Definition: heapam_xlog.h:53
Form_pg_class rd_rel
Definition: rel.h:83
unsigned int Oid
Definition: postgres_ext.h:31
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:1031
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:163
#define ERROR
Definition: elog.h:43
char path[MAXPGPATH]
Definition: rewriteheap.c:209
TransactionId xid
Definition: rewriteheap.c:204
int errcode_for_file_access(void)
Definition: elog.c:593
unsigned int uint32
Definition: c.h:358
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define ereport(elevel, rest)
Definition: elog.h:141
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:159
HTAB * rs_logical_mappings
Definition: rewriteheap.c:162
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:323
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:415
int FileWrite(File file, char *buffer, int amount, off_t offset, uint32 wait_event_info)
Definition: fd.c:1916
Oid MyDatabaseId
Definition: globals.c:85
#define InvalidOid
Definition: postgres_ext.h:36
TransactionId mapped_xid
Definition: heapam_xlog.h:378
#define Assert(condition)
Definition: c.h:732
dlist_head mappings
Definition: rewriteheap.c:208
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1389
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1379
void * palloc(Size size)
Definition: mcxt.c:924
int errmsg(const char *fmt,...)
Definition: elog.c:784
LogicalRewriteMappingData map
Definition: rewriteheap.c:218
#define elog(elevel,...)
Definition: elog.h:226
struct LogicalRewriteMappingData LogicalRewriteMappingData
Relation rs_old_rel
Definition: rewriteheap.c:142
void XLogBeginInsert(void)
Definition: xloginsert.c:120
#define RelationGetRelid(relation)
Definition: rel.h:416

◆ logical_rewrite_heap_tuple()

static void logical_rewrite_heap_tuple ( RewriteState  state,
ItemPointerData  old_tid,
HeapTuple  new_tuple 
)
static

Definition at line 1058 of file rewriteheap.c.

References HEAP_XMAX_IS_LOCKED_ONLY, HeapTupleHeaderGetUpdateXid, HeapTupleHeaderGetXmin, logical_rewrite_log_mapping(), LogicalRewriteMappingData::new_node, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_node, LogicalRewriteMappingData::old_tid, RelationData::rd_node, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_logical_xmin, RewriteStateData::rs_new_rel, RewriteStateData::rs_old_rel, HeapTupleData::t_data, HeapTupleHeaderData::t_infomask, HeapTupleData::t_self, TransactionIdEquals, TransactionIdIsNormal, and TransactionIdPrecedes().

Referenced by rewrite_heap_tuple().

1060 {
1061  ItemPointerData new_tid = new_tuple->t_self;
1062  TransactionId cutoff = state->rs_logical_xmin;
1063  TransactionId xmin;
1064  TransactionId xmax;
1065  bool do_log_xmin = false;
1066  bool do_log_xmax = false;
1068 
1069  /* no logical rewrite in progress, we don't need to log anything */
1070  if (!state->rs_logical_rewrite)
1071  return;
1072 
1073  xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
1074  /* use *GetUpdateXid to correctly deal with multixacts */
1075  xmax = HeapTupleHeaderGetUpdateXid(new_tuple->t_data);
1076 
1077  /*
1078  * Log the mapping iff the tuple has been created recently.
1079  */
1080  if (TransactionIdIsNormal(xmin) && !TransactionIdPrecedes(xmin, cutoff))
1081  do_log_xmin = true;
1082 
1083  if (!TransactionIdIsNormal(xmax))
1084  {
1085  /*
1086  * no xmax is set, can't have any permanent ones, so this check is
1087  * sufficient
1088  */
1089  }
1090  else if (HEAP_XMAX_IS_LOCKED_ONLY(new_tuple->t_data->t_infomask))
1091  {
1092  /* only locked, we don't care */
1093  }
1094  else if (!TransactionIdPrecedes(xmax, cutoff))
1095  {
1096  /* tuple has been deleted recently, log */
1097  do_log_xmax = true;
1098  }
1099 
1100  /* if neither needs to be logged, we're done */
1101  if (!do_log_xmin && !do_log_xmax)
1102  return;
1103 
1104  /* fill out mapping information */
1105  map.old_node = state->rs_old_rel->rd_node;
1106  map.old_tid = old_tid;
1107  map.new_node = state->rs_new_rel->rd_node;
1108  map.new_tid = new_tid;
1109 
1110  /* ---
1111  * Now persist the mapping for the individual xids that are affected. We
1112  * need to log for both xmin and xmax if they aren't the same transaction
1113  * since the mapping files are per "affected" xid.
1114  * We don't muster all that much effort detecting whether xmin and xmax
1115  * are actually the same transaction, we just check whether the xid is the
1116  * same disregarding subtransactions. Logging too much is relatively
1117  * harmless and we could never do the check fully since subtransaction
1118  * data is thrown away during restarts.
1119  * ---
1120  */
1121  if (do_log_xmin)
1122  logical_rewrite_log_mapping(state, xmin, &map);
1123  /* separately log mapping for xmax unless it'd be redundant */
1124  if (do_log_xmax && !TransactionIdEquals(xmin, xmax))
1125  logical_rewrite_log_mapping(state, xmax, &map);
1126 }
#define HeapTupleHeaderGetUpdateXid(tup)
Definition: htup_details.h:365
TransactionId rs_logical_xmin
Definition: rewriteheap.c:153
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
uint32 TransactionId
Definition: c.h:507
Relation rs_new_rel
Definition: rewriteheap.c:143
HeapTupleHeader t_data
Definition: htup.h:68
ItemPointerData t_self
Definition: htup.h:65
static void logical_rewrite_log_mapping(RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
Definition: rewriteheap.c:991
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define HEAP_XMAX_IS_LOCKED_ONLY(infomask)
Definition: htup_details.h:230
RelFileNode rd_node
Definition: rel.h:54
ItemPointerData new_tid
Definition: rewriteheap.h:40
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:313
Relation rs_old_rel
Definition: rewriteheap.c:142
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
ItemPointerData old_tid
Definition: rewriteheap.h:39

◆ logical_rewrite_log_mapping()

static void logical_rewrite_log_mapping ( RewriteState  state,
TransactionId  xid,
LogicalRewriteMappingData map 
)
static

Definition at line 991 of file rewriteheap.c.

References dlist_init(), dlist_push_tail(), ereport, errcode_for_file_access(), errmsg(), ERROR, GetCurrentTransactionId(), HASH_ENTER, hash_search(), InvalidOid, logical_heap_rewrite_flush_mappings(), LOGICAL_REWRITE_FORMAT, RewriteMappingDataEntry::map, RewriteMappingFile::mappings, MAXPGPATH, MemoryContextAlloc(), MyDatabaseId, RewriteMappingDataEntry::node, RewriteMappingFile::num_mappings, RewriteMappingFile::off, RewriteMappingFile::path, PathNameOpenFile(), PG_BINARY, RelationData::rd_rel, RelationGetRelid, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_cxt, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_num_rewrite_mappings, RewriteStateData::rs_old_rel, snprintf, and RewriteMappingFile::vfd.

Referenced by logical_rewrite_heap_tuple().

993 {
994  RewriteMappingFile *src;
996  Oid relid;
997  bool found;
998 
999  relid = RelationGetRelid(state->rs_old_rel);
1000 
1001  /* look for existing mappings for this 'mapped' xid */
1002  src = hash_search(state->rs_logical_mappings, &xid,
1003  HASH_ENTER, &found);
1004 
1005  /*
1006  * We haven't yet had the need to map anything for this xid, create
1007  * per-xid data structures.
1008  */
1009  if (!found)
1010  {
1011  char path[MAXPGPATH];
1012  Oid dboid;
1013 
1014  if (state->rs_old_rel->rd_rel->relisshared)
1015  dboid = InvalidOid;
1016  else
1017  dboid = MyDatabaseId;
1018 
1019  snprintf(path, MAXPGPATH,
1020  "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
1021  dboid, relid,
1022  (uint32) (state->rs_begin_lsn >> 32),
1023  (uint32) state->rs_begin_lsn,
1024  xid, GetCurrentTransactionId());
1025 
1026  dlist_init(&src->mappings);
1027  src->num_mappings = 0;
1028  src->off = 0;
1029  memcpy(src->path, path, sizeof(path));
1030  src->vfd = PathNameOpenFile(path,
1031  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1032  if (src->vfd < 0)
1033  ereport(ERROR,
1035  errmsg("could not create file \"%s\": %m", path)));
1036  }
1037 
1038  pmap = MemoryContextAlloc(state->rs_cxt,
1039  sizeof(RewriteMappingDataEntry));
1040  memcpy(&pmap->map, map, sizeof(LogicalRewriteMappingData));
1041  dlist_push_tail(&src->mappings, &pmap->node);
1042  src->num_mappings++;
1043  state->rs_num_rewrite_mappings++;
1044 
1045  /*
1046  * Write out buffer every time we've too many in-memory entries across all
1047  * mapping files.
1048  */
1049  if (state->rs_num_rewrite_mappings >= 1000 /* arbitrary number */ )
1051 }
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1323
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
Form_pg_class rd_rel
Definition: rel.h:83
unsigned int Oid
Definition: postgres_ext.h:31
#define PG_BINARY
Definition: c.h:1191
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:163
#define ERROR
Definition: elog.h:43
char path[MAXPGPATH]
Definition: rewriteheap.c:209
#define MAXPGPATH
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:423
int errcode_for_file_access(void)
Definition: elog.c:593
unsigned int uint32
Definition: c.h:358
#define ereport(elevel, rest)
Definition: elog.h:141
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:159
HTAB * rs_logical_mappings
Definition: rewriteheap.c:162
MemoryContext rs_cxt
Definition: rewriteheap.c:157
Oid MyDatabaseId
Definition: globals.c:85
#define InvalidOid
Definition: postgres_ext.h:36
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
dlist_head mappings
Definition: rewriteheap.c:208
static void logical_heap_rewrite_flush_mappings(RewriteState state)
Definition: rewriteheap.c:863
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
int errmsg(const char *fmt,...)
Definition: elog.c:784
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:771
LogicalRewriteMappingData map
Definition: rewriteheap.c:218
Relation rs_old_rel
Definition: rewriteheap.c:142
#define snprintf
Definition: port.h:192
#define RelationGetRelid(relation)
Definition: rel.h:416

◆ raw_heap_insert()

static void raw_heap_insert ( RewriteState  state,
HeapTuple  tup 
)
static

Definition at line 631 of file rewriteheap.c.

References Assert, elog, ereport, errcode(), errmsg(), ERROR, HEAP_DEFAULT_FILLFACTOR, heap_freetuple(), HEAP_INSERT_NO_LOGICAL, HEAP_INSERT_SKIP_FSM, HEAP_INSERT_SKIP_WAL, HeapTupleHasExternal, InvalidOffsetNumber, ItemPointerIsValid, ItemPointerSet, log_newpage(), MAIN_FORKNUM, MAXALIGN, MaxHeapTupleSize, PageAddItem, PageGetHeapFreeSpace(), PageGetItem, PageGetItemId, PageInit(), PageSetChecksumInplace(), RelationData::rd_node, RelationData::rd_rel, RelationData::rd_smgr, RelationGetTargetPageFreeSpace, RelationOpenSmgr, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_new_rel, RewriteStateData::rs_use_wal, smgrextend(), HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, HeapTupleData::t_len, HeapTupleData::t_self, toast_insert_or_update(), and TOAST_TUPLE_THRESHOLD.

Referenced by end_heap_rewrite(), and rewrite_heap_tuple().

632 {
633  Page page = state->rs_buffer;
634  Size pageFreeSpace,
635  saveFreeSpace;
636  Size len;
637  OffsetNumber newoff;
638  HeapTuple heaptup;
639 
640  /*
641  * If the new tuple is too big for storage or contains already toasted
642  * out-of-line attributes from some other relation, invoke the toaster.
643  *
644  * Note: below this point, heaptup is the data we actually intend to store
645  * into the relation; tup is the caller's original untoasted data.
646  */
647  if (state->rs_new_rel->rd_rel->relkind == RELKIND_TOASTVALUE)
648  {
649  /* toast table entries should never be recursively toasted */
651  heaptup = tup;
652  }
653  else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
654  {
656 
657  if (!state->rs_use_wal)
658  options |= HEAP_INSERT_SKIP_WAL;
659 
660  /*
661  * While rewriting the heap for VACUUM FULL / CLUSTER, make sure data
662  * for the TOAST table are not logically decoded. The main heap is
663  * WAL-logged as XLOG FPI records, which are not logically decoded.
664  */
665  options |= HEAP_INSERT_NO_LOGICAL;
666 
667  heaptup = toast_insert_or_update(state->rs_new_rel, tup, NULL,
668  options);
669  }
670  else
671  heaptup = tup;
672 
673  len = MAXALIGN(heaptup->t_len); /* be conservative */
674 
675  /*
676  * If we're gonna fail for oversize tuple, do it right away
677  */
678  if (len > MaxHeapTupleSize)
679  ereport(ERROR,
680  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
681  errmsg("row is too big: size %zu, maximum size %zu",
682  len, MaxHeapTupleSize)));
683 
684  /* Compute desired extra freespace due to fillfactor option */
685  saveFreeSpace = RelationGetTargetPageFreeSpace(state->rs_new_rel,
687 
688  /* Now we can check to see if there's enough free space already. */
689  if (state->rs_buffer_valid)
690  {
691  pageFreeSpace = PageGetHeapFreeSpace(page);
692 
693  if (len + saveFreeSpace > pageFreeSpace)
694  {
695  /* Doesn't fit, so write out the existing page */
696 
697  /* XLOG stuff */
698  if (state->rs_use_wal)
699  log_newpage(&state->rs_new_rel->rd_node,
700  MAIN_FORKNUM,
701  state->rs_blockno,
702  page,
703  true);
704 
705  /*
706  * Now write the page. We say isTemp = true even if it's not a
707  * temp table, because there's no need for smgr to schedule an
708  * fsync for this write; we'll do it ourselves in
709  * end_heap_rewrite.
710  */
712 
713  PageSetChecksumInplace(page, state->rs_blockno);
714 
716  state->rs_blockno, (char *) page, true);
717 
718  state->rs_blockno++;
719  state->rs_buffer_valid = false;
720  }
721  }
722 
723  if (!state->rs_buffer_valid)
724  {
725  /* Initialize a new empty page */
726  PageInit(page, BLCKSZ, 0);
727  state->rs_buffer_valid = true;
728  }
729 
730  /* And now we can insert the tuple into the page */
731  newoff = PageAddItem(page, (Item) heaptup->t_data, heaptup->t_len,
732  InvalidOffsetNumber, false, true);
733  if (newoff == InvalidOffsetNumber)
734  elog(ERROR, "failed to add tuple");
735 
736  /* Update caller's t_self to the actual position where it was stored */
737  ItemPointerSet(&(tup->t_self), state->rs_blockno, newoff);
738 
739  /*
740  * Insert the correct position into CTID of the stored tuple, too, if the
741  * caller didn't supply a valid CTID.
742  */
743  if (!ItemPointerIsValid(&tup->t_data->t_ctid))
744  {
745  ItemId newitemid;
746  HeapTupleHeader onpage_tup;
747 
748  newitemid = PageGetItemId(page, newoff);
749  onpage_tup = (HeapTupleHeader) PageGetItem(page, newitemid);
750 
751  onpage_tup->t_ctid = tup->t_self;
752  }
753 
754  /* If heaptup is a private copy, release it. */
755  if (heaptup != tup)
756  heap_freetuple(heaptup);
757 }
#define ItemPointerIsValid(pointer)
Definition: itemptr.h:82
#define TOAST_TUPLE_THRESHOLD
Definition: tuptoaster.h:55
HeapTuple toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, int options)
Definition: tuptoaster.c:536
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
struct SMgrRelationData * rd_smgr
Definition: rel.h:56
Pointer Item
Definition: item.h:17
Relation rs_new_rel
Definition: rewriteheap.c:143
int errcode(int sqlerrcode)
Definition: elog.c:570
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:416
#define HEAP_INSERT_SKIP_WAL
Definition: heapam.h:32
Form_pg_class rd_rel
Definition: rel.h:83
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
uint16 OffsetNumber
Definition: off.h:24
HeapTupleHeader t_data
Definition: htup.h:68
#define RelationOpenSmgr(relation)
Definition: rel.h:473
#define ERROR
Definition: elog.h:43
Size PageGetHeapFreeSpace(Page page)
Definition: bufpage.c:665
ItemPointerData t_ctid
Definition: htup_details.h:160
ItemPointerData t_self
Definition: htup.h:65
uint32 t_len
Definition: htup.h:64
#define MaxHeapTupleSize
Definition: htup_details.h:560
#define ereport(elevel, rest)
Definition: elog.h:141
#define RelationGetTargetPageFreeSpace(relation, defaultff)
Definition: rel.h:307
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:235
#define InvalidOffsetNumber
Definition: off.h:26
RelFileNode rd_node
Definition: rel.h:54
#define Assert(condition)
Definition: c.h:732
size_t Size
Definition: c.h:466
void PageSetChecksumInplace(Page page, BlockNumber blkno)
Definition: bufpage.c:1198
#define MAXALIGN(LEN)
Definition: c.h:685
void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync)
Definition: smgr.c:537
#define HEAP_INSERT_SKIP_FSM
Definition: heapam.h:33
#define HeapTupleHasExternal(tuple)
Definition: htup_details.h:673
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define HEAP_INSERT_NO_LOGICAL
Definition: heapam.h:35
XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, Page page, bool page_std)
Definition: xloginsert.c:972
#define elog(elevel,...)
Definition: elog.h:226
#define HEAP_DEFAULT_FILLFACTOR
Definition: rel.h:278
BlockNumber rs_blockno
Definition: rewriteheap.c:145
#define PageGetItem(page, itemId)
Definition: bufpage.h:340
Pointer Page
Definition: bufpage.h:78
#define ItemPointerSet(pointer, blockNumber, offNum)
Definition: itemptr.h:127
void PageInit(Page page, Size pageSize, Size specialSize)
Definition: bufpage.c:42

◆ rewrite_heap_dead_tuple()

bool rewrite_heap_dead_tuple ( RewriteState  state,
HeapTuple  old_tuple 
)

Definition at line 581 of file rewriteheap.c.

References Assert, HASH_FIND, HASH_REMOVE, hash_search(), heap_freetuple(), HeapTupleHeaderGetXmin, RewriteStateData::rs_unresolved_tups, HeapTupleData::t_data, HeapTupleData::t_self, TidHashKey::tid, UnresolvedTupData::tuple, and TidHashKey::xmin.

Referenced by heapam_relation_copy_for_cluster().

582 {
583  /*
584  * If we have already seen an earlier tuple in the update chain that
585  * points to this tuple, let's forget about that earlier tuple. It's in
586  * fact dead as well, our simple xmax < OldestXmin test in
587  * HeapTupleSatisfiesVacuum just wasn't enough to detect it. It happens
588  * when xmin of a tuple is greater than xmax, which sounds
589  * counter-intuitive but is perfectly valid.
590  *
591  * We don't bother to try to detect the situation the other way round,
592  * when we encounter the dead tuple first and then the recently dead one
593  * that points to it. If that happens, we'll have some unmatched entries
594  * in the UnresolvedTups hash table at the end. That can happen anyway,
595  * because a vacuum might have removed the dead tuple in the chain before
596  * us.
597  */
598  UnresolvedTup unresolved;
599  TidHashKey hashkey;
600  bool found;
601 
602  memset(&hashkey, 0, sizeof(hashkey));
603  hashkey.xmin = HeapTupleHeaderGetXmin(old_tuple->t_data);
604  hashkey.tid = old_tuple->t_self;
605 
606  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
607  HASH_FIND, NULL);
608 
609  if (unresolved != NULL)
610  {
611  /* Need to free the contained tuple as well as the hashtable entry */
612  heap_freetuple(unresolved->tuple);
613  hash_search(state->rs_unresolved_tups, &hashkey,
614  HASH_REMOVE, &found);
615  Assert(found);
616  return true;
617  }
618 
619  return false;
620 }
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
HeapTupleHeader t_data
Definition: htup.h:68
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:160
ItemPointerData t_self
Definition: htup.h:65
ItemPointerData tid
Definition: rewriteheap.c:175
#define Assert(condition)
Definition: c.h:732
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:313
TransactionId xmin
Definition: rewriteheap.c:174

◆ rewrite_heap_tuple()

void rewrite_heap_tuple ( RewriteState  state,
HeapTuple  old_tuple,
HeapTuple  new_tuple 
)

Definition at line 379 of file rewriteheap.c.

References Assert, HASH_ENTER, HASH_FIND, HASH_REMOVE, hash_search(), HEAP2_XACT_MASK, heap_copytuple(), heap_freetuple(), heap_freeze_tuple(), HEAP_UPDATED, HEAP_XACT_MASK, HEAP_XMAX_INVALID, HeapTupleHeaderGetUpdateXid, HeapTupleHeaderGetXmin, HeapTupleHeaderIndicatesMovedPartitions, HeapTupleHeaderIsOnlyLocked(), ItemPointerEquals(), ItemPointerSetInvalid, logical_rewrite_heap_tuple(), MemoryContextSwitchTo(), OldToNewMappingData::new_tid, UnresolvedTupData::old_tid, raw_heap_insert(), RelationData::rd_rel, RewriteStateData::rs_cutoff_multi, RewriteStateData::rs_cxt, RewriteStateData::rs_freeze_xid, RewriteStateData::rs_old_new_tid_map, RewriteStateData::rs_old_rel, RewriteStateData::rs_oldest_xmin, RewriteStateData::rs_unresolved_tups, HeapTupleHeaderData::t_choice, HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, HeapTupleHeaderData::t_heap, HeapTupleHeaderData::t_infomask, HeapTupleHeaderData::t_infomask2, HeapTupleData::t_self, TidHashKey::tid, TransactionIdPrecedes(), UnresolvedTupData::tuple, and TidHashKey::xmin.

Referenced by reform_and_rewrite_tuple().

381 {
382  MemoryContext old_cxt;
383  ItemPointerData old_tid;
384  TidHashKey hashkey;
385  bool found;
386  bool free_new;
387 
388  old_cxt = MemoryContextSwitchTo(state->rs_cxt);
389 
390  /*
391  * Copy the original tuple's visibility information into new_tuple.
392  *
393  * XXX we might later need to copy some t_infomask2 bits, too? Right now,
394  * we intentionally clear the HOT status bits.
395  */
396  memcpy(&new_tuple->t_data->t_choice.t_heap,
397  &old_tuple->t_data->t_choice.t_heap,
398  sizeof(HeapTupleFields));
399 
400  new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
401  new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
402  new_tuple->t_data->t_infomask |=
403  old_tuple->t_data->t_infomask & HEAP_XACT_MASK;
404 
405  /*
406  * While we have our hands on the tuple, we may as well freeze any
407  * eligible xmin or xmax, so that future VACUUM effort can be saved.
408  */
409  heap_freeze_tuple(new_tuple->t_data,
410  state->rs_old_rel->rd_rel->relfrozenxid,
411  state->rs_old_rel->rd_rel->relminmxid,
412  state->rs_freeze_xid,
413  state->rs_cutoff_multi);
414 
415  /*
416  * Invalid ctid means that ctid should point to the tuple itself. We'll
417  * override it later if the tuple is part of an update chain.
418  */
419  ItemPointerSetInvalid(&new_tuple->t_data->t_ctid);
420 
421  /*
422  * If the tuple has been updated, check the old-to-new mapping hash table.
423  */
424  if (!((old_tuple->t_data->t_infomask & HEAP_XMAX_INVALID) ||
425  HeapTupleHeaderIsOnlyLocked(old_tuple->t_data)) &&
427  !(ItemPointerEquals(&(old_tuple->t_self),
428  &(old_tuple->t_data->t_ctid))))
429  {
430  OldToNewMapping mapping;
431 
432  memset(&hashkey, 0, sizeof(hashkey));
433  hashkey.xmin = HeapTupleHeaderGetUpdateXid(old_tuple->t_data);
434  hashkey.tid = old_tuple->t_data->t_ctid;
435 
436  mapping = (OldToNewMapping)
437  hash_search(state->rs_old_new_tid_map, &hashkey,
438  HASH_FIND, NULL);
439 
440  if (mapping != NULL)
441  {
442  /*
443  * We've already copied the tuple that t_ctid points to, so we can
444  * set the ctid of this tuple to point to the new location, and
445  * insert it right away.
446  */
447  new_tuple->t_data->t_ctid = mapping->new_tid;
448 
449  /* We don't need the mapping entry anymore */
450  hash_search(state->rs_old_new_tid_map, &hashkey,
451  HASH_REMOVE, &found);
452  Assert(found);
453  }
454  else
455  {
456  /*
457  * We haven't seen the tuple t_ctid points to yet. Stash this
458  * tuple into unresolved_tups to be written later.
459  */
460  UnresolvedTup unresolved;
461 
462  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
463  HASH_ENTER, &found);
464  Assert(!found);
465 
466  unresolved->old_tid = old_tuple->t_self;
467  unresolved->tuple = heap_copytuple(new_tuple);
468 
469  /*
470  * We can't do anything more now, since we don't know where the
471  * tuple will be written.
472  */
473  MemoryContextSwitchTo(old_cxt);
474  return;
475  }
476  }
477 
478  /*
479  * Now we will write the tuple, and then check to see if it is the B tuple
480  * in any new or known pair. When we resolve a known pair, we will be
481  * able to write that pair's A tuple, and then we have to check if it
482  * resolves some other pair. Hence, we need a loop here.
483  */
484  old_tid = old_tuple->t_self;
485  free_new = false;
486 
487  for (;;)
488  {
489  ItemPointerData new_tid;
490 
491  /* Insert the tuple and find out where it's put in new_heap */
492  raw_heap_insert(state, new_tuple);
493  new_tid = new_tuple->t_self;
494 
495  logical_rewrite_heap_tuple(state, old_tid, new_tuple);
496 
497  /*
498  * If the tuple is the updated version of a row, and the prior version
499  * wouldn't be DEAD yet, then we need to either resolve the prior
500  * version (if it's waiting in rs_unresolved_tups), or make an entry
501  * in rs_old_new_tid_map (so we can resolve it when we do see it). The
502  * previous tuple's xmax would equal this one's xmin, so it's
503  * RECENTLY_DEAD if and only if the xmin is not before OldestXmin.
504  */
505  if ((new_tuple->t_data->t_infomask & HEAP_UPDATED) &&
507  state->rs_oldest_xmin))
508  {
509  /*
510  * Okay, this is B in an update pair. See if we've seen A.
511  */
512  UnresolvedTup unresolved;
513 
514  memset(&hashkey, 0, sizeof(hashkey));
515  hashkey.xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
516  hashkey.tid = old_tid;
517 
518  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
519  HASH_FIND, NULL);
520 
521  if (unresolved != NULL)
522  {
523  /*
524  * We have seen and memorized the previous tuple already. Now
525  * that we know where we inserted the tuple its t_ctid points
526  * to, fix its t_ctid and insert it to the new heap.
527  */
528  if (free_new)
529  heap_freetuple(new_tuple);
530  new_tuple = unresolved->tuple;
531  free_new = true;
532  old_tid = unresolved->old_tid;
533  new_tuple->t_data->t_ctid = new_tid;
534 
535  /*
536  * We don't need the hash entry anymore, but don't free its
537  * tuple just yet.
538  */
539  hash_search(state->rs_unresolved_tups, &hashkey,
540  HASH_REMOVE, &found);
541  Assert(found);
542 
543  /* loop back to insert the previous tuple in the chain */
544  continue;
545  }
546  else
547  {
548  /*
549  * Remember the new tid of this tuple. We'll use it to set the
550  * ctid when we find the previous tuple in the chain.
551  */
552  OldToNewMapping mapping;
553 
554  mapping = hash_search(state->rs_old_new_tid_map, &hashkey,
555  HASH_ENTER, &found);
556  Assert(!found);
557 
558  mapping->new_tid = new_tid;
559  }
560  }
561 
562  /* Done with this (chain of) tuples, for now */
563  if (free_new)
564  heap_freetuple(new_tuple);
565  break;
566  }
567 
568  MemoryContextSwitchTo(old_cxt);
569 }
#define HeapTupleHeaderGetUpdateXid(tup)
Definition: htup_details.h:365
HeapTuple heap_copytuple(HeapTuple tuple)
Definition: heaptuple.c:680
union HeapTupleHeaderData::@45 t_choice
HeapTupleFields t_heap
Definition: htup_details.h:156
static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
Definition: rewriteheap.c:1058
TransactionId rs_freeze_xid
Definition: rewriteheap.c:151
bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple)
MultiXactId rs_cutoff_multi
Definition: rewriteheap.c:155
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define HEAP2_XACT_MASK
Definition: htup_details.h:283
#define HeapTupleHeaderIndicatesMovedPartitions(tup)
Definition: htup_details.h:445
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
#define HEAP_UPDATED
Definition: htup_details.h:209
ItemPointerData old_tid
Definition: rewriteheap.c:184
Form_pg_class rd_rel
Definition: rel.h:83
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
HeapTupleHeader t_data
Definition: htup.h:68
ItemPointerData new_tid
Definition: rewriteheap.c:193
#define HEAP_XMAX_INVALID
Definition: htup_details.h:207
ItemPointerData t_ctid
Definition: htup_details.h:160
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:160
ItemPointerData t_self
Definition: htup.h:65
bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId relfrozenxid, TransactionId relminmxid, TransactionId cutoff_xid, TransactionId cutoff_multi)
Definition: heapam.c:6368
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
TransactionId rs_oldest_xmin
Definition: rewriteheap.c:149
MemoryContext rs_cxt
Definition: rewriteheap.c:157
ItemPointerData tid
Definition: rewriteheap.c:175
#define Assert(condition)
Definition: c.h:732
HTAB * rs_old_new_tid_map
Definition: rewriteheap.c:161
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:313
bool ItemPointerEquals(ItemPointer pointer1, ItemPointer pointer2)
Definition: itemptr.c:29
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:172
TransactionId xmin
Definition: rewriteheap.c:174
static void raw_heap_insert(RewriteState state, HeapTuple tup)
Definition: rewriteheap.c:631
#define HEAP_XACT_MASK
Definition: htup_details.h:218
Relation rs_old_rel
Definition: rewriteheap.c:142
OldToNewMappingData * OldToNewMapping
Definition: rewriteheap.c:196