PostgreSQL Source Code  git master
rewriteheap.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <unistd.h>
#include "access/heapam.h"
#include "access/heapam_xlog.h"
#include "access/heaptoast.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xloginsert.h"
#include "catalog/catalog.h"
#include "lib/ilist.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/slot.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/procarray.h"
#include "storage/smgr.h"
#include "utils/memutils.h"
#include "utils/rel.h"
Include dependency graph for rewriteheap.c:

Go to the source code of this file.

Data Structures

struct  RewriteStateData
 
struct  TidHashKey
 
struct  UnresolvedTupData
 
struct  OldToNewMappingData
 
struct  RewriteMappingFile
 
struct  RewriteMappingDataEntry
 

Typedefs

typedef struct RewriteStateData RewriteStateData
 
typedef UnresolvedTupDataUnresolvedTup
 
typedef OldToNewMappingDataOldToNewMapping
 
typedef struct RewriteMappingFile RewriteMappingFile
 
typedef struct RewriteMappingDataEntry RewriteMappingDataEntry
 

Functions

static void raw_heap_insert (RewriteState state, HeapTuple tup)
 
static void logical_begin_heap_rewrite (RewriteState state)
 
static void logical_rewrite_heap_tuple (RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
 
static void logical_end_heap_rewrite (RewriteState state)
 
RewriteState begin_heap_rewrite (Relation old_heap, Relation new_heap, TransactionId oldest_xmin, TransactionId freeze_xid, MultiXactId cutoff_multi)
 
void end_heap_rewrite (RewriteState state)
 
void rewrite_heap_tuple (RewriteState state, HeapTuple old_tuple, HeapTuple new_tuple)
 
bool rewrite_heap_dead_tuple (RewriteState state, HeapTuple old_tuple)
 
static void logical_heap_rewrite_flush_mappings (RewriteState state)
 
static void logical_rewrite_log_mapping (RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
 
void heap_xlog_logical_rewrite (XLogReaderState *r)
 
void CheckPointLogicalRewriteHeap (void)
 

Typedef Documentation

◆ OldToNewMapping

Definition at line 187 of file rewriteheap.c.

◆ RewriteMappingDataEntry

◆ RewriteMappingFile

◆ RewriteStateData

◆ UnresolvedTup

Definition at line 179 of file rewriteheap.c.

Function Documentation

◆ begin_heap_rewrite()

RewriteState begin_heap_rewrite ( Relation  old_heap,
Relation  new_heap,
TransactionId  oldest_xmin,
TransactionId  freeze_xid,
MultiXactId  cutoff_multi 
)

Definition at line 237 of file rewriteheap.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CurrentMemoryContext, HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, logical_begin_heap_rewrite(), MemoryContextSwitchTo(), palloc(), palloc0(), RelationGetNumberOfBlocks, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_cutoff_multi, RewriteStateData::rs_cxt, RewriteStateData::rs_freeze_xid, RewriteStateData::rs_new_rel, RewriteStateData::rs_old_new_tid_map, RewriteStateData::rs_old_rel, RewriteStateData::rs_oldest_xmin, and RewriteStateData::rs_unresolved_tups.

Referenced by heapam_relation_copy_for_cluster().

239 {
241  MemoryContext rw_cxt;
242  MemoryContext old_cxt;
243  HASHCTL hash_ctl;
244 
245  /*
246  * To ease cleanup, make a separate context that will contain the
247  * RewriteState struct itself plus all subsidiary data.
248  */
250  "Table rewrite",
252  old_cxt = MemoryContextSwitchTo(rw_cxt);
253 
254  /* Create and fill in the state struct */
255  state = palloc0(sizeof(RewriteStateData));
256 
257  state->rs_old_rel = old_heap;
258  state->rs_new_rel = new_heap;
259  state->rs_buffer = (Page) palloc(BLCKSZ);
260  /* new_heap needn't be empty, just locked */
261  state->rs_blockno = RelationGetNumberOfBlocks(new_heap);
262  state->rs_buffer_valid = false;
263  state->rs_oldest_xmin = oldest_xmin;
264  state->rs_freeze_xid = freeze_xid;
265  state->rs_cutoff_multi = cutoff_multi;
266  state->rs_cxt = rw_cxt;
267 
268  /* Initialize hash tables used to track update chains */
269  hash_ctl.keysize = sizeof(TidHashKey);
270  hash_ctl.entrysize = sizeof(UnresolvedTupData);
271  hash_ctl.hcxt = state->rs_cxt;
272 
273  state->rs_unresolved_tups =
274  hash_create("Rewrite / Unresolved ctids",
275  128, /* arbitrary initial size */
276  &hash_ctl,
278 
279  hash_ctl.entrysize = sizeof(OldToNewMappingData);
280 
281  state->rs_old_new_tid_map =
282  hash_create("Rewrite / Old to new tid map",
283  128, /* arbitrary initial size */
284  &hash_ctl,
286 
287  MemoryContextSwitchTo(old_cxt);
288 
290 
291  return state;
292 }
#define AllocSetContextCreate
Definition: memutils.h:173
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
MemoryContext hcxt
Definition: hsearch.h:86
TransactionId rs_freeze_xid
Definition: rewriteheap.c:142
MultiXactId rs_cutoff_multi
Definition: rewriteheap.c:146
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Size entrysize
Definition: hsearch.h:76
Relation rs_new_rel
Definition: rewriteheap.c:135
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:151
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:195
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
TransactionId rs_oldest_xmin
Definition: rewriteheap.c:140
#define HASH_BLOBS
Definition: hsearch.h:97
MemoryContext rs_cxt
Definition: rewriteheap.c:148
void * palloc0(Size size)
Definition: mcxt.c:1093
Size keysize
Definition: hsearch.h:75
#define RelationGetNumberOfBlocks(reln)
Definition: bufmgr.h:213
static void logical_begin_heap_rewrite(RewriteState state)
Definition: rewriteheap.c:801
HTAB * rs_old_new_tid_map
Definition: rewriteheap.c:152
Definition: regguts.h:317
void * palloc(Size size)
Definition: mcxt.c:1062
Relation rs_old_rel
Definition: rewriteheap.c:134
BlockNumber rs_blockno
Definition: rewriteheap.c:137
Pointer Page
Definition: bufpage.h:78

◆ CheckPointLogicalRewriteHeap()

void CheckPointLogicalRewriteHeap ( void  )

Definition at line 1199 of file rewriteheap.c.

References AllocateDir(), CloseTransientFile(), dirent::d_name, data_sync_elevel(), DEBUG1, elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FreeDir(), GetRedoRecPtr(), InvalidXLogRecPtr, LOGICAL_REWRITE_FORMAT, lstat, MAXPGPATH, OpenTransientFile(), PG_BINARY, pg_fsync(), pgstat_report_wait_end(), pgstat_report_wait_start(), ReadDir(), ReplicationSlotsComputeLogicalRestartLSN(), S_ISREG, snprintf, stat::st_mode, and WAIT_EVENT_LOGICAL_REWRITE_CHECKPOINT_SYNC.

Referenced by CheckPointGuts().

1200 {
1201  XLogRecPtr cutoff;
1202  XLogRecPtr redo;
1203  DIR *mappings_dir;
1204  struct dirent *mapping_de;
1205  char path[MAXPGPATH + 20];
1206 
1207  /*
1208  * We start of with a minimum of the last redo pointer. No new decoding
1209  * slot will start before that, so that's a safe upper bound for removal.
1210  */
1211  redo = GetRedoRecPtr();
1212 
1213  /* now check for the restart ptrs from existing slots */
1215 
1216  /* don't start earlier than the restart lsn */
1217  if (cutoff != InvalidXLogRecPtr && redo < cutoff)
1218  cutoff = redo;
1219 
1220  mappings_dir = AllocateDir("pg_logical/mappings");
1221  while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
1222  {
1223  struct stat statbuf;
1224  Oid dboid;
1225  Oid relid;
1226  XLogRecPtr lsn;
1227  TransactionId rewrite_xid;
1228  TransactionId create_xid;
1229  uint32 hi,
1230  lo;
1231 
1232  if (strcmp(mapping_de->d_name, ".") == 0 ||
1233  strcmp(mapping_de->d_name, "..") == 0)
1234  continue;
1235 
1236  snprintf(path, sizeof(path), "pg_logical/mappings/%s", mapping_de->d_name);
1237  if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
1238  continue;
1239 
1240  /* Skip over files that cannot be ours. */
1241  if (strncmp(mapping_de->d_name, "map-", 4) != 0)
1242  continue;
1243 
1244  if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
1245  &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
1246  elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
1247 
1248  lsn = ((uint64) hi) << 32 | lo;
1249 
1250  if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
1251  {
1252  elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
1253  if (unlink(path) < 0)
1254  ereport(ERROR,
1256  errmsg("could not remove file \"%s\": %m", path)));
1257  }
1258  else
1259  {
1260  /* on some operating systems fsyncing a file requires O_RDWR */
1261  int fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
1262 
1263  /*
1264  * The file cannot vanish due to concurrency since this function
1265  * is the only one removing logical mappings and only one
1266  * checkpoint can be in progress at a time.
1267  */
1268  if (fd < 0)
1269  ereport(ERROR,
1271  errmsg("could not open file \"%s\": %m", path)));
1272 
1273  /*
1274  * We could try to avoid fsyncing files that either haven't
1275  * changed or have only been created since the checkpoint's start,
1276  * but it's currently not deemed worth the effort.
1277  */
1279  if (pg_fsync(fd) != 0)
1282  errmsg("could not fsync file \"%s\": %m", path)));
1284 
1285  if (CloseTransientFile(fd) != 0)
1286  ereport(ERROR,
1288  errmsg("could not close file \"%s\": %m", path)));
1289  }
1290  }
1291  FreeDir(mappings_dir);
1292 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define DEBUG1
Definition: elog.h:25
static void pgstat_report_wait_end(void)
Definition: wait_event.h:277
uint32 TransactionId
Definition: c.h:587
unsigned int Oid
Definition: postgres_ext.h:31
Definition: dirent.h:9
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1271
Definition: dirent.c:25
#define ERROR
Definition: elog.h:46
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2423
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:721
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition: slot.c:884
unsigned int uint32
Definition: c.h:441
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2634
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:261
#define S_ISREG(m)
Definition: win32_port.h:319
int CloseTransientFile(int fd)
Definition: fd.c:2600
int data_sync_elevel(int elevel)
Definition: fd.c:3714
#define ereport(elevel,...)
Definition: elog.h:157
uint64 XLogRecPtr
Definition: xlogdefs.h:21
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2700
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:8529
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
#define lstat(path, sb)
Definition: win32_port.h:276
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define elog(elevel,...)
Definition: elog.h:232
int pg_fsync(int fd)
Definition: fd.c:352
char d_name[MAX_PATH]
Definition: dirent.h:15
#define snprintf
Definition: port.h:216
int FreeDir(DIR *dir)
Definition: fd.c:2752

◆ end_heap_rewrite()

void end_heap_rewrite ( RewriteState  state)

Definition at line 300 of file rewriteheap.c.

References hash_seq_init(), hash_seq_search(), ItemPointerSetInvalid, log_newpage(), logical_end_heap_rewrite(), MAIN_FORKNUM, MemoryContextDelete(), PageSetChecksumInplace(), raw_heap_insert(), RelationData::rd_node, RelationData::rd_smgr, RelationNeedsWAL, RelationOpenSmgr, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_cxt, RewriteStateData::rs_new_rel, RewriteStateData::rs_unresolved_tups, smgrextend(), smgrimmedsync(), HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, and UnresolvedTupData::tuple.

Referenced by heapam_relation_copy_for_cluster().

301 {
302  HASH_SEQ_STATUS seq_status;
303  UnresolvedTup unresolved;
304 
305  /*
306  * Write any remaining tuples in the UnresolvedTups table. If we have any
307  * left, they should in fact be dead, but let's err on the safe side.
308  */
309  hash_seq_init(&seq_status, state->rs_unresolved_tups);
310 
311  while ((unresolved = hash_seq_search(&seq_status)) != NULL)
312  {
313  ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
314  raw_heap_insert(state, unresolved->tuple);
315  }
316 
317  /* Write the last page, if any */
318  if (state->rs_buffer_valid)
319  {
320  if (RelationNeedsWAL(state->rs_new_rel))
321  log_newpage(&state->rs_new_rel->rd_node,
322  MAIN_FORKNUM,
323  state->rs_blockno,
324  state->rs_buffer,
325  true);
326 
328 
331  (char *) state->rs_buffer, true);
332  }
333 
334  /*
335  * When we WAL-logged rel pages, we must nonetheless fsync them. The
336  * reason is the same as in storage.c's RelationCopyStorage(): we're
337  * writing data that's not in shared buffers, and so a CHECKPOINT
338  * occurring during the rewriteheap operation won't have fsync'd data we
339  * wrote before the checkpoint.
340  */
341  if (RelationNeedsWAL(state->rs_new_rel))
342  {
343  /* for an empty table, this could be first smgr access */
346  }
347 
349 
350  /* Deleting the context frees everything */
351  MemoryContextDelete(state->rs_cxt);
352 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:218
static void logical_end_heap_rewrite(RewriteState state)
Definition: rewriteheap.c:947
struct SMgrRelationData * rd_smgr
Definition: rel.h:57
Relation rs_new_rel
Definition: rewriteheap.c:135
HeapTupleHeader t_data
Definition: htup.h:68
#define RelationOpenSmgr(relation)
Definition: rel.h:529
ItemPointerData t_ctid
Definition: htup_details.h:160
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:151
MemoryContext rs_cxt
Definition: rewriteheap.c:148
RelFileNode rd_node
Definition: rel.h:55
void PageSetChecksumInplace(Page page, BlockNumber blkno)
Definition: bufpage.c:1532
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
#define RelationNeedsWAL(relation)
Definition: rel.h:585
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync)
Definition: smgr.c:462
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:172
XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, Page page, bool page_std)
Definition: xloginsert.c:996
static void raw_heap_insert(RewriteState state, HeapTuple tup)
Definition: rewriteheap.c:618
BlockNumber rs_blockno
Definition: rewriteheap.c:137
void smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
Definition: smgr.c:660

◆ heap_xlog_logical_rewrite()

void heap_xlog_logical_rewrite ( XLogReaderState r)

Definition at line 1117 of file rewriteheap.c.

References CloseTransientFile(), data_sync_elevel(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ftruncate, LOGICAL_REWRITE_FORMAT, LSN_FORMAT_ARGS, xl_heap_rewrite_mapping::mapped_db, xl_heap_rewrite_mapping::mapped_rel, xl_heap_rewrite_mapping::mapped_xid, MAXPGPATH, xl_heap_rewrite_mapping::num_mappings, xl_heap_rewrite_mapping::offset, OpenTransientFile(), PG_BINARY, pg_fsync(), pg_pwrite(), pgstat_report_wait_end(), pgstat_report_wait_start(), snprintf, xl_heap_rewrite_mapping::start_lsn, WAIT_EVENT_LOGICAL_REWRITE_MAPPING_SYNC, WAIT_EVENT_LOGICAL_REWRITE_MAPPING_WRITE, WAIT_EVENT_LOGICAL_REWRITE_TRUNCATE, XLogRecGetData, and XLogRecGetXid.

Referenced by heap2_redo().

1118 {
1119  char path[MAXPGPATH];
1120  int fd;
1121  xl_heap_rewrite_mapping *xlrec;
1122  uint32 len;
1123  char *data;
1124 
1125  xlrec = (xl_heap_rewrite_mapping *) XLogRecGetData(r);
1126 
1127  snprintf(path, MAXPGPATH,
1128  "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
1129  xlrec->mapped_db, xlrec->mapped_rel,
1130  LSN_FORMAT_ARGS(xlrec->start_lsn),
1131  xlrec->mapped_xid, XLogRecGetXid(r));
1132 
1133  fd = OpenTransientFile(path,
1134  O_CREAT | O_WRONLY | PG_BINARY);
1135  if (fd < 0)
1136  ereport(ERROR,
1138  errmsg("could not create file \"%s\": %m", path)));
1139 
1140  /*
1141  * Truncate all data that's not guaranteed to have been safely fsynced (by
1142  * previous record or by the last checkpoint).
1143  */
1145  if (ftruncate(fd, xlrec->offset) != 0)
1146  ereport(ERROR,
1148  errmsg("could not truncate file \"%s\" to %u: %m",
1149  path, (uint32) xlrec->offset)));
1151 
1152  data = XLogRecGetData(r) + sizeof(*xlrec);
1153 
1154  len = xlrec->num_mappings * sizeof(LogicalRewriteMappingData);
1155 
1156  /* write out tail end of mapping file (again) */
1157  errno = 0;
1159  if (pg_pwrite(fd, data, len, xlrec->offset) != len)
1160  {
1161  /* if write didn't set errno, assume problem is no disk space */
1162  if (errno == 0)
1163  errno = ENOSPC;
1164  ereport(ERROR,
1166  errmsg("could not write to file \"%s\": %m", path)));
1167  }
1169 
1170  /*
1171  * Now fsync all previously written data. We could improve things and only
1172  * do this for the last write to a file, but the required bookkeeping
1173  * doesn't seem worth the trouble.
1174  */
1176  if (pg_fsync(fd) != 0)
1179  errmsg("could not fsync file \"%s\": %m", path)));
1181 
1182  if (CloseTransientFile(fd) != 0)
1183  ereport(ERROR,
1185  errmsg("could not close file \"%s\": %m", path)));
1186 }
static void pgstat_report_wait_end(void)
Definition: wait_event.h:277
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1271
ssize_t pg_pwrite(int fd, const void *buf, size_t nbyte, off_t offset)
Definition: pwrite.c:27
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecGetData(decoder)
Definition: xlogreader.h:382
#define ERROR
Definition: elog.h:46
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2423
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:721
unsigned int uint32
Definition: c.h:441
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:261
int CloseTransientFile(int fd)
Definition: fd.c:2600
#define XLogRecGetXid(decoder)
Definition: xlogreader.h:379
int data_sync_elevel(int elevel)
Definition: fd.c:3714
#define ereport(elevel,...)
Definition: elog.h:157
TransactionId mapped_xid
Definition: heapam_xlog.h:384
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
int errmsg(const char *fmt,...)
Definition: elog.c:909
struct LogicalRewriteMappingData LogicalRewriteMappingData
int pg_fsync(int fd)
Definition: fd.c:352
#define snprintf
Definition: port.h:216
#define ftruncate(a, b)
Definition: win32_port.h:65

◆ logical_begin_heap_rewrite()

static void logical_begin_heap_rewrite ( RewriteState  state)
static

Definition at line 801 of file rewriteheap.c.

References HASHCTL::entrysize, GetXLogInsertRecPtr(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, HASHCTL::keysize, ProcArrayGetReplicationSlotXmin(), RelationIsAccessibleInLogicalDecoding, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_cxt, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_logical_xmin, RewriteStateData::rs_num_rewrite_mappings, and RewriteStateData::rs_old_rel.

Referenced by begin_heap_rewrite().

802 {
803  HASHCTL hash_ctl;
804  TransactionId logical_xmin;
805 
806  /*
807  * We only need to persist these mappings if the rewritten table can be
808  * accessed during logical decoding, if not, we can skip doing any
809  * additional work.
810  */
811  state->rs_logical_rewrite =
813 
814  if (!state->rs_logical_rewrite)
815  return;
816 
817  ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin);
818 
819  /*
820  * If there are no logical slots in progress we don't need to do anything,
821  * there cannot be any remappings for relevant rows yet. The relation's
822  * lock protects us against races.
823  */
824  if (logical_xmin == InvalidTransactionId)
825  {
826  state->rs_logical_rewrite = false;
827  return;
828  }
829 
830  state->rs_logical_xmin = logical_xmin;
832  state->rs_num_rewrite_mappings = 0;
833 
834  hash_ctl.keysize = sizeof(TransactionId);
835  hash_ctl.entrysize = sizeof(RewriteMappingFile);
836  hash_ctl.hcxt = state->rs_cxt;
837 
838  state->rs_logical_mappings =
839  hash_create("Logical rewrite mapping",
840  128, /* arbitrary initial size */
841  &hash_ctl,
843 }
TransactionId rs_logical_xmin
Definition: rewriteheap.c:144
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
uint32 TransactionId
Definition: c.h:587
MemoryContext hcxt
Definition: hsearch.h:86
Size entrysize
Definition: hsearch.h:76
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:3824
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:154
XLogRecPtr GetXLogInsertRecPtr(void)
Definition: xlog.c:11747
#define InvalidTransactionId
Definition: transam.h:31
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:150
HTAB * rs_logical_mappings
Definition: rewriteheap.c:153
#define HASH_BLOBS
Definition: hsearch.h:97
MemoryContext rs_cxt
Definition: rewriteheap.c:148
#define RelationIsAccessibleInLogicalDecoding(relation)
Definition: rel.h:641
Size keysize
Definition: hsearch.h:75
struct RewriteMappingFile RewriteMappingFile
Relation rs_old_rel
Definition: rewriteheap.c:134

◆ logical_end_heap_rewrite()

static void logical_end_heap_rewrite ( RewriteState  state)
static

Definition at line 947 of file rewriteheap.c.

References data_sync_elevel(), ereport, errcode_for_file_access(), errmsg(), ERROR, FileClose(), FileSync(), hash_seq_init(), hash_seq_search(), logical_heap_rewrite_flush_mappings(), RewriteMappingFile::path, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_num_rewrite_mappings, RewriteMappingFile::vfd, and WAIT_EVENT_LOGICAL_REWRITE_SYNC.

Referenced by end_heap_rewrite().

948 {
949  HASH_SEQ_STATUS seq_status;
950  RewriteMappingFile *src;
951 
952  /* done, no logical rewrite in progress */
953  if (!state->rs_logical_rewrite)
954  return;
955 
956  /* writeout remaining in-memory entries */
957  if (state->rs_num_rewrite_mappings > 0)
959 
960  /* Iterate over all mappings we have written and fsync the files. */
961  hash_seq_init(&seq_status, state->rs_logical_mappings);
962  while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
963  {
967  errmsg("could not fsync file \"%s\": %m", src->path)));
968  FileClose(src->vfd);
969  }
970  /* memory context cleanup will deal with the rest */
971 }
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:154
#define ERROR
Definition: elog.h:46
char path[MAXPGPATH]
Definition: rewriteheap.c:200
int FileSync(File file, uint32 wait_event_info)
Definition: fd.c:2178
int errcode_for_file_access(void)
Definition: elog.c:721
HTAB * rs_logical_mappings
Definition: rewriteheap.c:153
int data_sync_elevel(int elevel)
Definition: fd.c:3714
#define ereport(elevel,...)
Definition: elog.h:157
void FileClose(File file)
Definition: fd.c:1873
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
static void logical_heap_rewrite_flush_mappings(RewriteState state)
Definition: rewriteheap.c:849
int errmsg(const char *fmt,...)
Definition: elog.c:909

◆ logical_heap_rewrite_flush_mappings()

static void logical_heap_rewrite_flush_mappings ( RewriteState  state)
static

Definition at line 849 of file rewriteheap.c.

References Assert, dlist_mutable_iter::cur, DEBUG1, dlist_container, dlist_delete(), dlist_foreach_modify, elog, ereport, errcode_for_file_access(), errmsg(), ERROR, FileWrite(), hash_seq_init(), hash_seq_search(), InvalidOid, RewriteMappingDataEntry::map, xl_heap_rewrite_mapping::mapped_db, xl_heap_rewrite_mapping::mapped_rel, xl_heap_rewrite_mapping::mapped_xid, RewriteMappingFile::mappings, MyDatabaseId, RewriteMappingDataEntry::node, RewriteMappingFile::num_mappings, xl_heap_rewrite_mapping::num_mappings, RewriteMappingFile::off, xl_heap_rewrite_mapping::offset, palloc(), RewriteMappingFile::path, pfree(), RelationData::rd_rel, RelationGetRelid, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_num_rewrite_mappings, RewriteStateData::rs_old_rel, xl_heap_rewrite_mapping::start_lsn, RewriteMappingFile::vfd, WAIT_EVENT_LOGICAL_REWRITE_WRITE, RewriteMappingFile::xid, XLOG_HEAP2_REWRITE, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by logical_end_heap_rewrite(), and logical_rewrite_log_mapping().

850 {
851  HASH_SEQ_STATUS seq_status;
852  RewriteMappingFile *src;
853  dlist_mutable_iter iter;
854 
855  Assert(state->rs_logical_rewrite);
856 
857  /* no logical rewrite in progress, no need to iterate over mappings */
858  if (state->rs_num_rewrite_mappings == 0)
859  return;
860 
861  elog(DEBUG1, "flushing %u logical rewrite mapping entries",
862  state->rs_num_rewrite_mappings);
863 
864  hash_seq_init(&seq_status, state->rs_logical_mappings);
865  while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
866  {
867  char *waldata;
868  char *waldata_start;
870  Oid dboid;
871  uint32 len;
872  int written;
873 
874  /* this file hasn't got any new mappings */
875  if (src->num_mappings == 0)
876  continue;
877 
878  if (state->rs_old_rel->rd_rel->relisshared)
879  dboid = InvalidOid;
880  else
881  dboid = MyDatabaseId;
882 
883  xlrec.num_mappings = src->num_mappings;
884  xlrec.mapped_rel = RelationGetRelid(state->rs_old_rel);
885  xlrec.mapped_xid = src->xid;
886  xlrec.mapped_db = dboid;
887  xlrec.offset = src->off;
888  xlrec.start_lsn = state->rs_begin_lsn;
889 
890  /* write all mappings consecutively */
891  len = src->num_mappings * sizeof(LogicalRewriteMappingData);
892  waldata_start = waldata = palloc(len);
893 
894  /*
895  * collect data we need to write out, but don't modify ondisk data yet
896  */
897  dlist_foreach_modify(iter, &src->mappings)
898  {
900 
901  pmap = dlist_container(RewriteMappingDataEntry, node, iter.cur);
902 
903  memcpy(waldata, &pmap->map, sizeof(pmap->map));
904  waldata += sizeof(pmap->map);
905 
906  /* remove from the list and free */
907  dlist_delete(&pmap->node);
908  pfree(pmap);
909 
910  /* update bookkeeping */
911  state->rs_num_rewrite_mappings--;
912  src->num_mappings--;
913  }
914 
915  Assert(src->num_mappings == 0);
916  Assert(waldata == waldata_start + len);
917 
918  /*
919  * Note that we deviate from the usual WAL coding practices here,
920  * check the above "Logical rewrite support" comment for reasoning.
921  */
922  written = FileWrite(src->vfd, waldata_start, len, src->off,
924  if (written != len)
925  ereport(ERROR,
927  errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path,
928  written, len)));
929  src->off += len;
930 
931  XLogBeginInsert();
932  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
933  XLogRegisterData(waldata_start, len);
934 
935  /* write xlog record */
936  XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_REWRITE);
937 
938  pfree(waldata_start);
939  }
940  Assert(state->rs_num_rewrite_mappings == 0);
941 }
#define DEBUG1
Definition: elog.h:25
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:543
#define XLOG_HEAP2_REWRITE
Definition: heapam_xlog.h:53
Form_pg_class rd_rel
Definition: rel.h:110
unsigned int Oid
Definition: postgres_ext.h:31
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
void pfree(void *pointer)
Definition: mcxt.c:1169
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:154
#define ERROR
Definition: elog.h:46
char path[MAXPGPATH]
Definition: rewriteheap.c:200
TransactionId xid
Definition: rewriteheap.c:195
int errcode_for_file_access(void)
Definition: elog.c:721
unsigned int uint32
Definition: c.h:441
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:150
HTAB * rs_logical_mappings
Definition: rewriteheap.c:153
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:330
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:422
int FileWrite(File file, char *buffer, int amount, off_t offset, uint32 wait_event_info)
Definition: fd.c:2080
Oid MyDatabaseId
Definition: globals.c:88
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:157
TransactionId mapped_xid
Definition: heapam_xlog.h:384
#define Assert(condition)
Definition: c.h:804
dlist_head mappings
Definition: rewriteheap.c:199
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
void * palloc(Size size)
Definition: mcxt.c:1062
int errmsg(const char *fmt,...)
Definition: elog.c:909
LogicalRewriteMappingData map
Definition: rewriteheap.c:209
#define elog(elevel,...)
Definition: elog.h:232
struct LogicalRewriteMappingData LogicalRewriteMappingData
Relation rs_old_rel
Definition: rewriteheap.c:134
void XLogBeginInsert(void)
Definition: xloginsert.c:123
#define RelationGetRelid(relation)
Definition: rel.h:472

◆ logical_rewrite_heap_tuple()

static void logical_rewrite_heap_tuple ( RewriteState  state,
ItemPointerData  old_tid,
HeapTuple  new_tuple 
)
static

Definition at line 1043 of file rewriteheap.c.

References HEAP_XMAX_IS_LOCKED_ONLY, HeapTupleHeaderGetUpdateXid, HeapTupleHeaderGetXmin, logical_rewrite_log_mapping(), LogicalRewriteMappingData::new_node, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_node, LogicalRewriteMappingData::old_tid, RelationData::rd_node, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_logical_xmin, RewriteStateData::rs_new_rel, RewriteStateData::rs_old_rel, HeapTupleData::t_data, HeapTupleHeaderData::t_infomask, HeapTupleData::t_self, TransactionIdEquals, TransactionIdIsNormal, and TransactionIdPrecedes().

Referenced by rewrite_heap_tuple().

1045 {
1046  ItemPointerData new_tid = new_tuple->t_self;
1047  TransactionId cutoff = state->rs_logical_xmin;
1048  TransactionId xmin;
1049  TransactionId xmax;
1050  bool do_log_xmin = false;
1051  bool do_log_xmax = false;
1053 
1054  /* no logical rewrite in progress, we don't need to log anything */
1055  if (!state->rs_logical_rewrite)
1056  return;
1057 
1058  xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
1059  /* use *GetUpdateXid to correctly deal with multixacts */
1060  xmax = HeapTupleHeaderGetUpdateXid(new_tuple->t_data);
1061 
1062  /*
1063  * Log the mapping iff the tuple has been created recently.
1064  */
1065  if (TransactionIdIsNormal(xmin) && !TransactionIdPrecedes(xmin, cutoff))
1066  do_log_xmin = true;
1067 
1068  if (!TransactionIdIsNormal(xmax))
1069  {
1070  /*
1071  * no xmax is set, can't have any permanent ones, so this check is
1072  * sufficient
1073  */
1074  }
1075  else if (HEAP_XMAX_IS_LOCKED_ONLY(new_tuple->t_data->t_infomask))
1076  {
1077  /* only locked, we don't care */
1078  }
1079  else if (!TransactionIdPrecedes(xmax, cutoff))
1080  {
1081  /* tuple has been deleted recently, log */
1082  do_log_xmax = true;
1083  }
1084 
1085  /* if neither needs to be logged, we're done */
1086  if (!do_log_xmin && !do_log_xmax)
1087  return;
1088 
1089  /* fill out mapping information */
1090  map.old_node = state->rs_old_rel->rd_node;
1091  map.old_tid = old_tid;
1092  map.new_node = state->rs_new_rel->rd_node;
1093  map.new_tid = new_tid;
1094 
1095  /* ---
1096  * Now persist the mapping for the individual xids that are affected. We
1097  * need to log for both xmin and xmax if they aren't the same transaction
1098  * since the mapping files are per "affected" xid.
1099  * We don't muster all that much effort detecting whether xmin and xmax
1100  * are actually the same transaction, we just check whether the xid is the
1101  * same disregarding subtransactions. Logging too much is relatively
1102  * harmless and we could never do the check fully since subtransaction
1103  * data is thrown away during restarts.
1104  * ---
1105  */
1106  if (do_log_xmin)
1107  logical_rewrite_log_mapping(state, xmin, &map);
1108  /* separately log mapping for xmax unless it'd be redundant */
1109  if (do_log_xmax && !TransactionIdEquals(xmin, xmax))
1110  logical_rewrite_log_mapping(state, xmax, &map);
1111 }
#define HeapTupleHeaderGetUpdateXid(tup)
Definition: htup_details.h:365
TransactionId rs_logical_xmin
Definition: rewriteheap.c:144
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
uint32 TransactionId
Definition: c.h:587
Relation rs_new_rel
Definition: rewriteheap.c:135
HeapTupleHeader t_data
Definition: htup.h:68
ItemPointerData t_self
Definition: htup.h:65
static void logical_rewrite_log_mapping(RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
Definition: rewriteheap.c:977
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define HEAP_XMAX_IS_LOCKED_ONLY(infomask)
Definition: htup_details.h:230
RelFileNode rd_node
Definition: rel.h:55
ItemPointerData new_tid
Definition: rewriteheap.h:40
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:313
Relation rs_old_rel
Definition: rewriteheap.c:134
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
ItemPointerData old_tid
Definition: rewriteheap.h:39

◆ logical_rewrite_log_mapping()

static void logical_rewrite_log_mapping ( RewriteState  state,
TransactionId  xid,
LogicalRewriteMappingData map 
)
static

Definition at line 977 of file rewriteheap.c.

References dlist_init(), dlist_push_tail(), ereport, errcode_for_file_access(), errmsg(), ERROR, GetCurrentTransactionId(), HASH_ENTER, hash_search(), InvalidOid, logical_heap_rewrite_flush_mappings(), LOGICAL_REWRITE_FORMAT, LSN_FORMAT_ARGS, RewriteMappingDataEntry::map, RewriteMappingFile::mappings, MAXPGPATH, MemoryContextAlloc(), MyDatabaseId, RewriteMappingDataEntry::node, RewriteMappingFile::num_mappings, RewriteMappingFile::off, RewriteMappingFile::path, PathNameOpenFile(), PG_BINARY, RelationData::rd_rel, RelationGetRelid, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_cxt, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_num_rewrite_mappings, RewriteStateData::rs_old_rel, snprintf, and RewriteMappingFile::vfd.

Referenced by logical_rewrite_heap_tuple().

979 {
980  RewriteMappingFile *src;
982  Oid relid;
983  bool found;
984 
985  relid = RelationGetRelid(state->rs_old_rel);
986 
987  /* look for existing mappings for this 'mapped' xid */
988  src = hash_search(state->rs_logical_mappings, &xid,
989  HASH_ENTER, &found);
990 
991  /*
992  * We haven't yet had the need to map anything for this xid, create
993  * per-xid data structures.
994  */
995  if (!found)
996  {
997  char path[MAXPGPATH];
998  Oid dboid;
999 
1000  if (state->rs_old_rel->rd_rel->relisshared)
1001  dboid = InvalidOid;
1002  else
1003  dboid = MyDatabaseId;
1004 
1005  snprintf(path, MAXPGPATH,
1006  "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
1007  dboid, relid,
1008  LSN_FORMAT_ARGS(state->rs_begin_lsn),
1009  xid, GetCurrentTransactionId());
1010 
1011  dlist_init(&src->mappings);
1012  src->num_mappings = 0;
1013  src->off = 0;
1014  memcpy(src->path, path, sizeof(path));
1015  src->vfd = PathNameOpenFile(path,
1016  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1017  if (src->vfd < 0)
1018  ereport(ERROR,
1020  errmsg("could not create file \"%s\": %m", path)));
1021  }
1022 
1023  pmap = MemoryContextAlloc(state->rs_cxt,
1024  sizeof(RewriteMappingDataEntry));
1025  memcpy(&pmap->map, map, sizeof(LogicalRewriteMappingData));
1026  dlist_push_tail(&src->mappings, &pmap->node);
1027  src->num_mappings++;
1028  state->rs_num_rewrite_mappings++;
1029 
1030  /*
1031  * Write out buffer every time we've too many in-memory entries across all
1032  * mapping files.
1033  */
1034  if (state->rs_num_rewrite_mappings >= 1000 /* arbitrary number */ )
1036 }
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1484
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
Form_pg_class rd_rel
Definition: rel.h:110
unsigned int Oid
Definition: postgres_ext.h:31
#define PG_BINARY
Definition: c.h:1271
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:154
#define ERROR
Definition: elog.h:46
char path[MAXPGPATH]
Definition: rewriteheap.c:200
#define MAXPGPATH
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:438
int errcode_for_file_access(void)
Definition: elog.c:721
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:150
HTAB * rs_logical_mappings
Definition: rewriteheap.c:153
MemoryContext rs_cxt
Definition: rewriteheap.c:148
Oid MyDatabaseId
Definition: globals.c:88
#define InvalidOid
Definition: postgres_ext.h:36
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
#define ereport(elevel,...)
Definition: elog.h:157
dlist_head mappings
Definition: rewriteheap.c:199
static void logical_heap_rewrite_flush_mappings(RewriteState state)
Definition: rewriteheap.c:849
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
int errmsg(const char *fmt,...)
Definition: elog.c:909
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863
LogicalRewriteMappingData map
Definition: rewriteheap.c:209
Relation rs_old_rel
Definition: rewriteheap.c:134
#define snprintf
Definition: port.h:216
#define RelationGetRelid(relation)
Definition: rel.h:472

◆ raw_heap_insert()

static void raw_heap_insert ( RewriteState  state,
HeapTuple  tup 
)
static

Definition at line 618 of file rewriteheap.c.

References Assert, elog, ereport, errcode(), errmsg(), ERROR, HEAP_DEFAULT_FILLFACTOR, heap_freetuple(), HEAP_INSERT_NO_LOGICAL, HEAP_INSERT_SKIP_FSM, heap_toast_insert_or_update(), HeapTupleHasExternal, InvalidOffsetNumber, ItemPointerIsValid, ItemPointerSet, log_newpage(), MAIN_FORKNUM, MAXALIGN, MaxHeapTupleSize, PageAddItem, PageGetHeapFreeSpace(), PageGetItem, PageGetItemId, PageInit(), PageSetChecksumInplace(), RelationData::rd_node, RelationData::rd_rel, RelationData::rd_smgr, RelationGetTargetPageFreeSpace, RelationNeedsWAL, RelationOpenSmgr, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_new_rel, smgrextend(), HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, HeapTupleData::t_len, HeapTupleData::t_self, and TOAST_TUPLE_THRESHOLD.

Referenced by end_heap_rewrite(), and rewrite_heap_tuple().

619 {
620  Page page = state->rs_buffer;
621  Size pageFreeSpace,
622  saveFreeSpace;
623  Size len;
624  OffsetNumber newoff;
625  HeapTuple heaptup;
626 
627  /*
628  * If the new tuple is too big for storage or contains already toasted
629  * out-of-line attributes from some other relation, invoke the toaster.
630  *
631  * Note: below this point, heaptup is the data we actually intend to store
632  * into the relation; tup is the caller's original untoasted data.
633  */
634  if (state->rs_new_rel->rd_rel->relkind == RELKIND_TOASTVALUE)
635  {
636  /* toast table entries should never be recursively toasted */
638  heaptup = tup;
639  }
640  else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
641  {
643 
644  /*
645  * While rewriting the heap for VACUUM FULL / CLUSTER, make sure data
646  * for the TOAST table are not logically decoded. The main heap is
647  * WAL-logged as XLOG FPI records, which are not logically decoded.
648  */
649  options |= HEAP_INSERT_NO_LOGICAL;
650 
651  heaptup = heap_toast_insert_or_update(state->rs_new_rel, tup, NULL,
652  options);
653  }
654  else
655  heaptup = tup;
656 
657  len = MAXALIGN(heaptup->t_len); /* be conservative */
658 
659  /*
660  * If we're gonna fail for oversize tuple, do it right away
661  */
662  if (len > MaxHeapTupleSize)
663  ereport(ERROR,
664  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
665  errmsg("row is too big: size %zu, maximum size %zu",
666  len, MaxHeapTupleSize)));
667 
668  /* Compute desired extra freespace due to fillfactor option */
669  saveFreeSpace = RelationGetTargetPageFreeSpace(state->rs_new_rel,
671 
672  /* Now we can check to see if there's enough free space already. */
673  if (state->rs_buffer_valid)
674  {
675  pageFreeSpace = PageGetHeapFreeSpace(page);
676 
677  if (len + saveFreeSpace > pageFreeSpace)
678  {
679  /*
680  * Doesn't fit, so write out the existing page. It always
681  * contains a tuple. Hence, unlike RelationGetBufferForTuple(),
682  * enforce saveFreeSpace unconditionally.
683  */
684 
685  /* XLOG stuff */
686  if (RelationNeedsWAL(state->rs_new_rel))
687  log_newpage(&state->rs_new_rel->rd_node,
688  MAIN_FORKNUM,
689  state->rs_blockno,
690  page,
691  true);
692 
693  /*
694  * Now write the page. We say skipFsync = true because there's no
695  * need for smgr to schedule an fsync for this write; we'll do it
696  * ourselves in end_heap_rewrite.
697  */
699 
700  PageSetChecksumInplace(page, state->rs_blockno);
701 
703  state->rs_blockno, (char *) page, true);
704 
705  state->rs_blockno++;
706  state->rs_buffer_valid = false;
707  }
708  }
709 
710  if (!state->rs_buffer_valid)
711  {
712  /* Initialize a new empty page */
713  PageInit(page, BLCKSZ, 0);
714  state->rs_buffer_valid = true;
715  }
716 
717  /* And now we can insert the tuple into the page */
718  newoff = PageAddItem(page, (Item) heaptup->t_data, heaptup->t_len,
719  InvalidOffsetNumber, false, true);
720  if (newoff == InvalidOffsetNumber)
721  elog(ERROR, "failed to add tuple");
722 
723  /* Update caller's t_self to the actual position where it was stored */
724  ItemPointerSet(&(tup->t_self), state->rs_blockno, newoff);
725 
726  /*
727  * Insert the correct position into CTID of the stored tuple, too, if the
728  * caller didn't supply a valid CTID.
729  */
730  if (!ItemPointerIsValid(&tup->t_data->t_ctid))
731  {
732  ItemId newitemid;
733  HeapTupleHeader onpage_tup;
734 
735  newitemid = PageGetItemId(page, newoff);
736  onpage_tup = (HeapTupleHeader) PageGetItem(page, newitemid);
737 
738  onpage_tup->t_ctid = tup->t_self;
739  }
740 
741  /* If heaptup is a private copy, release it. */
742  if (heaptup != tup)
743  heap_freetuple(heaptup);
744 }
#define ItemPointerIsValid(pointer)
Definition: itemptr.h:82
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
struct SMgrRelationData * rd_smgr
Definition: rel.h:57
Pointer Item
Definition: item.h:17
Relation rs_new_rel
Definition: rewriteheap.c:135
int errcode(int sqlerrcode)
Definition: elog.c:698
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:416
Form_pg_class rd_rel
Definition: rel.h:110
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
uint16 OffsetNumber
Definition: off.h:24
HeapTupleHeader t_data
Definition: htup.h:68
#define RelationOpenSmgr(relation)
Definition: rel.h:529
#define ERROR
Definition: elog.h:46
Size PageGetHeapFreeSpace(Page page)
Definition: bufpage.c:984
ItemPointerData t_ctid
Definition: htup_details.h:160
ItemPointerData t_self
Definition: htup.h:65
uint32 t_len
Definition: htup.h:64
#define MaxHeapTupleSize
Definition: htup_details.h:559
#define RelationGetTargetPageFreeSpace(relation, defaultff)
Definition: rel.h:356
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:235
#define TOAST_TUPLE_THRESHOLD
Definition: heaptoast.h:48
#define InvalidOffsetNumber
Definition: off.h:26
#define ereport(elevel,...)
Definition: elog.h:157
RelFileNode rd_node
Definition: rel.h:55
#define Assert(condition)
Definition: c.h:804
size_t Size
Definition: c.h:540
void PageSetChecksumInplace(Page page, BlockNumber blkno)
Definition: bufpage.c:1532
#define MAXALIGN(LEN)
Definition: c.h:757
#define RelationNeedsWAL(relation)
Definition: rel.h:585
void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync)
Definition: smgr.c:462
#define HEAP_INSERT_SKIP_FSM
Definition: heapam.h:34
#define HeapTupleHasExternal(tuple)
Definition: htup_details.h:672
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define HEAP_INSERT_NO_LOGICAL
Definition: heapam.h:36
XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, Page page, bool page_std)
Definition: xloginsert.c:996
#define elog(elevel,...)
Definition: elog.h:232
#define HEAP_DEFAULT_FILLFACTOR
Definition: rel.h:327
HeapTuple heap_toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, int options)
Definition: heaptoast.c:96
BlockNumber rs_blockno
Definition: rewriteheap.c:137
#define PageGetItem(page, itemId)
Definition: bufpage.h:340
Pointer Page
Definition: bufpage.h:78
#define ItemPointerSet(pointer, blockNumber, offNum)
Definition: itemptr.h:127
void PageInit(Page page, Size pageSize, Size specialSize)
Definition: bufpage.c:42

◆ rewrite_heap_dead_tuple()

bool rewrite_heap_dead_tuple ( RewriteState  state,
HeapTuple  old_tuple 
)

Definition at line 568 of file rewriteheap.c.

References Assert, HASH_FIND, HASH_REMOVE, hash_search(), heap_freetuple(), HeapTupleHeaderGetXmin, RewriteStateData::rs_unresolved_tups, HeapTupleData::t_data, HeapTupleData::t_self, TidHashKey::tid, UnresolvedTupData::tuple, and TidHashKey::xmin.

Referenced by heapam_relation_copy_for_cluster().

569 {
570  /*
571  * If we have already seen an earlier tuple in the update chain that
572  * points to this tuple, let's forget about that earlier tuple. It's in
573  * fact dead as well, our simple xmax < OldestXmin test in
574  * HeapTupleSatisfiesVacuum just wasn't enough to detect it. It happens
575  * when xmin of a tuple is greater than xmax, which sounds
576  * counter-intuitive but is perfectly valid.
577  *
578  * We don't bother to try to detect the situation the other way round,
579  * when we encounter the dead tuple first and then the recently dead one
580  * that points to it. If that happens, we'll have some unmatched entries
581  * in the UnresolvedTups hash table at the end. That can happen anyway,
582  * because a vacuum might have removed the dead tuple in the chain before
583  * us.
584  */
585  UnresolvedTup unresolved;
586  TidHashKey hashkey;
587  bool found;
588 
589  memset(&hashkey, 0, sizeof(hashkey));
590  hashkey.xmin = HeapTupleHeaderGetXmin(old_tuple->t_data);
591  hashkey.tid = old_tuple->t_self;
592 
593  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
594  HASH_FIND, NULL);
595 
596  if (unresolved != NULL)
597  {
598  /* Need to free the contained tuple as well as the hashtable entry */
599  heap_freetuple(unresolved->tuple);
600  hash_search(state->rs_unresolved_tups, &hashkey,
601  HASH_REMOVE, &found);
602  Assert(found);
603  return true;
604  }
605 
606  return false;
607 }
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
HeapTupleHeader t_data
Definition: htup.h:68
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:151
ItemPointerData t_self
Definition: htup.h:65
ItemPointerData tid
Definition: rewriteheap.c:166
#define Assert(condition)
Definition: c.h:804
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:313
TransactionId xmin
Definition: rewriteheap.c:165

◆ rewrite_heap_tuple()

void rewrite_heap_tuple ( RewriteState  state,
HeapTuple  old_tuple,
HeapTuple  new_tuple 
)

Definition at line 366 of file rewriteheap.c.

References Assert, HASH_ENTER, HASH_FIND, HASH_REMOVE, hash_search(), HEAP2_XACT_MASK, heap_copytuple(), heap_freetuple(), heap_freeze_tuple(), HEAP_UPDATED, HEAP_XACT_MASK, HEAP_XMAX_INVALID, HeapTupleHeaderGetUpdateXid, HeapTupleHeaderGetXmin, HeapTupleHeaderIndicatesMovedPartitions, HeapTupleHeaderIsOnlyLocked(), ItemPointerEquals(), ItemPointerSetInvalid, logical_rewrite_heap_tuple(), MemoryContextSwitchTo(), OldToNewMappingData::new_tid, UnresolvedTupData::old_tid, raw_heap_insert(), RelationData::rd_rel, RewriteStateData::rs_cutoff_multi, RewriteStateData::rs_cxt, RewriteStateData::rs_freeze_xid, RewriteStateData::rs_old_new_tid_map, RewriteStateData::rs_old_rel, RewriteStateData::rs_oldest_xmin, RewriteStateData::rs_unresolved_tups, HeapTupleHeaderData::t_choice, HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, HeapTupleHeaderData::t_heap, HeapTupleHeaderData::t_infomask, HeapTupleHeaderData::t_infomask2, HeapTupleData::t_self, TidHashKey::tid, TransactionIdPrecedes(), UnresolvedTupData::tuple, and TidHashKey::xmin.

Referenced by reform_and_rewrite_tuple().

368 {
369  MemoryContext old_cxt;
370  ItemPointerData old_tid;
371  TidHashKey hashkey;
372  bool found;
373  bool free_new;
374 
375  old_cxt = MemoryContextSwitchTo(state->rs_cxt);
376 
377  /*
378  * Copy the original tuple's visibility information into new_tuple.
379  *
380  * XXX we might later need to copy some t_infomask2 bits, too? Right now,
381  * we intentionally clear the HOT status bits.
382  */
383  memcpy(&new_tuple->t_data->t_choice.t_heap,
384  &old_tuple->t_data->t_choice.t_heap,
385  sizeof(HeapTupleFields));
386 
387  new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
388  new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
389  new_tuple->t_data->t_infomask |=
390  old_tuple->t_data->t_infomask & HEAP_XACT_MASK;
391 
392  /*
393  * While we have our hands on the tuple, we may as well freeze any
394  * eligible xmin or xmax, so that future VACUUM effort can be saved.
395  */
396  heap_freeze_tuple(new_tuple->t_data,
397  state->rs_old_rel->rd_rel->relfrozenxid,
398  state->rs_old_rel->rd_rel->relminmxid,
399  state->rs_freeze_xid,
400  state->rs_cutoff_multi);
401 
402  /*
403  * Invalid ctid means that ctid should point to the tuple itself. We'll
404  * override it later if the tuple is part of an update chain.
405  */
406  ItemPointerSetInvalid(&new_tuple->t_data->t_ctid);
407 
408  /*
409  * If the tuple has been updated, check the old-to-new mapping hash table.
410  */
411  if (!((old_tuple->t_data->t_infomask & HEAP_XMAX_INVALID) ||
412  HeapTupleHeaderIsOnlyLocked(old_tuple->t_data)) &&
414  !(ItemPointerEquals(&(old_tuple->t_self),
415  &(old_tuple->t_data->t_ctid))))
416  {
417  OldToNewMapping mapping;
418 
419  memset(&hashkey, 0, sizeof(hashkey));
420  hashkey.xmin = HeapTupleHeaderGetUpdateXid(old_tuple->t_data);
421  hashkey.tid = old_tuple->t_data->t_ctid;
422 
423  mapping = (OldToNewMapping)
424  hash_search(state->rs_old_new_tid_map, &hashkey,
425  HASH_FIND, NULL);
426 
427  if (mapping != NULL)
428  {
429  /*
430  * We've already copied the tuple that t_ctid points to, so we can
431  * set the ctid of this tuple to point to the new location, and
432  * insert it right away.
433  */
434  new_tuple->t_data->t_ctid = mapping->new_tid;
435 
436  /* We don't need the mapping entry anymore */
437  hash_search(state->rs_old_new_tid_map, &hashkey,
438  HASH_REMOVE, &found);
439  Assert(found);
440  }
441  else
442  {
443  /*
444  * We haven't seen the tuple t_ctid points to yet. Stash this
445  * tuple into unresolved_tups to be written later.
446  */
447  UnresolvedTup unresolved;
448 
449  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
450  HASH_ENTER, &found);
451  Assert(!found);
452 
453  unresolved->old_tid = old_tuple->t_self;
454  unresolved->tuple = heap_copytuple(new_tuple);
455 
456  /*
457  * We can't do anything more now, since we don't know where the
458  * tuple will be written.
459  */
460  MemoryContextSwitchTo(old_cxt);
461  return;
462  }
463  }
464 
465  /*
466  * Now we will write the tuple, and then check to see if it is the B tuple
467  * in any new or known pair. When we resolve a known pair, we will be
468  * able to write that pair's A tuple, and then we have to check if it
469  * resolves some other pair. Hence, we need a loop here.
470  */
471  old_tid = old_tuple->t_self;
472  free_new = false;
473 
474  for (;;)
475  {
476  ItemPointerData new_tid;
477 
478  /* Insert the tuple and find out where it's put in new_heap */
479  raw_heap_insert(state, new_tuple);
480  new_tid = new_tuple->t_self;
481 
482  logical_rewrite_heap_tuple(state, old_tid, new_tuple);
483 
484  /*
485  * If the tuple is the updated version of a row, and the prior version
486  * wouldn't be DEAD yet, then we need to either resolve the prior
487  * version (if it's waiting in rs_unresolved_tups), or make an entry
488  * in rs_old_new_tid_map (so we can resolve it when we do see it). The
489  * previous tuple's xmax would equal this one's xmin, so it's
490  * RECENTLY_DEAD if and only if the xmin is not before OldestXmin.
491  */
492  if ((new_tuple->t_data->t_infomask & HEAP_UPDATED) &&
494  state->rs_oldest_xmin))
495  {
496  /*
497  * Okay, this is B in an update pair. See if we've seen A.
498  */
499  UnresolvedTup unresolved;
500 
501  memset(&hashkey, 0, sizeof(hashkey));
502  hashkey.xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
503  hashkey.tid = old_tid;
504 
505  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
506  HASH_FIND, NULL);
507 
508  if (unresolved != NULL)
509  {
510  /*
511  * We have seen and memorized the previous tuple already. Now
512  * that we know where we inserted the tuple its t_ctid points
513  * to, fix its t_ctid and insert it to the new heap.
514  */
515  if (free_new)
516  heap_freetuple(new_tuple);
517  new_tuple = unresolved->tuple;
518  free_new = true;
519  old_tid = unresolved->old_tid;
520  new_tuple->t_data->t_ctid = new_tid;
521 
522  /*
523  * We don't need the hash entry anymore, but don't free its
524  * tuple just yet.
525  */
526  hash_search(state->rs_unresolved_tups, &hashkey,
527  HASH_REMOVE, &found);
528  Assert(found);
529 
530  /* loop back to insert the previous tuple in the chain */
531  continue;
532  }
533  else
534  {
535  /*
536  * Remember the new tid of this tuple. We'll use it to set the
537  * ctid when we find the previous tuple in the chain.
538  */
539  OldToNewMapping mapping;
540 
541  mapping = hash_search(state->rs_old_new_tid_map, &hashkey,
542  HASH_ENTER, &found);
543  Assert(!found);
544 
545  mapping->new_tid = new_tid;
546  }
547  }
548 
549  /* Done with this (chain of) tuples, for now */
550  if (free_new)
551  heap_freetuple(new_tuple);
552  break;
553  }
554 
555  MemoryContextSwitchTo(old_cxt);
556 }
#define HeapTupleHeaderGetUpdateXid(tup)
Definition: htup_details.h:365
HeapTuple heap_copytuple(HeapTuple tuple)
Definition: heaptuple.c:680
HeapTupleFields t_heap
Definition: htup_details.h:156
static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
Definition: rewriteheap.c:1043
TransactionId rs_freeze_xid
Definition: rewriteheap.c:142
bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple)
MultiXactId rs_cutoff_multi
Definition: rewriteheap.c:146
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define HEAP2_XACT_MASK
Definition: htup_details.h:283
#define HeapTupleHeaderIndicatesMovedPartitions(tup)
Definition: htup_details.h:445
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
#define HEAP_UPDATED
Definition: htup_details.h:209
ItemPointerData old_tid
Definition: rewriteheap.c:175
Form_pg_class rd_rel
Definition: rel.h:110
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
union HeapTupleHeaderData::@43 t_choice
HeapTupleHeader t_data
Definition: htup.h:68
ItemPointerData new_tid
Definition: rewriteheap.c:184
#define HEAP_XMAX_INVALID
Definition: htup_details.h:207
ItemPointerData t_ctid
Definition: htup_details.h:160
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:151
ItemPointerData t_self
Definition: htup.h:65
bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId relfrozenxid, TransactionId relminmxid, TransactionId cutoff_xid, TransactionId cutoff_multi)
Definition: heapam.c:6709
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
TransactionId rs_oldest_xmin
Definition: rewriteheap.c:140
MemoryContext rs_cxt
Definition: rewriteheap.c:148
ItemPointerData tid
Definition: rewriteheap.c:166
#define Assert(condition)
Definition: c.h:804
HTAB * rs_old_new_tid_map
Definition: rewriteheap.c:152
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:313
bool ItemPointerEquals(ItemPointer pointer1, ItemPointer pointer2)
Definition: itemptr.c:29
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:172
TransactionId xmin
Definition: rewriteheap.c:165
static void raw_heap_insert(RewriteState state, HeapTuple tup)
Definition: rewriteheap.c:618
#define HEAP_XACT_MASK
Definition: htup_details.h:218
Relation rs_old_rel
Definition: rewriteheap.c:134
OldToNewMappingData * OldToNewMapping
Definition: rewriteheap.c:187