PostgreSQL Source Code  git master
rewriteheap.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <unistd.h>
#include "miscadmin.h"
#include "access/heapam.h"
#include "access/heapam_xlog.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/tuptoaster.h"
#include "access/xact.h"
#include "access/xloginsert.h"
#include "catalog/catalog.h"
#include "lib/ilist.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/slot.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/smgr.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/tqual.h"
#include "storage/procarray.h"
Include dependency graph for rewriteheap.c:

Go to the source code of this file.

Data Structures

struct  RewriteStateData
 
struct  TidHashKey
 
struct  UnresolvedTupData
 
struct  OldToNewMappingData
 
struct  RewriteMappingFile
 
struct  RewriteMappingDataEntry
 

Typedefs

typedef struct RewriteStateData RewriteStateData
 
typedef UnresolvedTupDataUnresolvedTup
 
typedef OldToNewMappingDataOldToNewMapping
 
typedef struct RewriteMappingFile RewriteMappingFile
 
typedef struct RewriteMappingDataEntry RewriteMappingDataEntry
 

Functions

static void raw_heap_insert (RewriteState state, HeapTuple tup)
 
static void logical_begin_heap_rewrite (RewriteState state)
 
static void logical_rewrite_heap_tuple (RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
 
static void logical_end_heap_rewrite (RewriteState state)
 
RewriteState begin_heap_rewrite (Relation old_heap, Relation new_heap, TransactionId oldest_xmin, TransactionId freeze_xid, MultiXactId cutoff_multi, bool use_wal)
 
void end_heap_rewrite (RewriteState state)
 
void rewrite_heap_tuple (RewriteState state, HeapTuple old_tuple, HeapTuple new_tuple)
 
bool rewrite_heap_dead_tuple (RewriteState state, HeapTuple old_tuple)
 
static void logical_heap_rewrite_flush_mappings (RewriteState state)
 
static void logical_rewrite_log_mapping (RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
 
void heap_xlog_logical_rewrite (XLogReaderState *r)
 
void CheckPointLogicalRewriteHeap (void)
 

Typedef Documentation

◆ OldToNewMapping

◆ RewriteMappingDataEntry

◆ RewriteMappingFile

◆ RewriteStateData

◆ UnresolvedTup

Function Documentation

◆ begin_heap_rewrite()

RewriteState begin_heap_rewrite ( Relation  old_heap,
Relation  new_heap,
TransactionId  oldest_xmin,
TransactionId  freeze_xid,
MultiXactId  cutoff_multi,
bool  use_wal 
)

Definition at line 248 of file rewriteheap.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CurrentMemoryContext, HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, logical_begin_heap_rewrite(), MemoryContextSwitchTo(), palloc(), palloc0(), RelationGetNumberOfBlocks, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_cutoff_multi, RewriteStateData::rs_cxt, RewriteStateData::rs_freeze_xid, RewriteStateData::rs_new_rel, RewriteStateData::rs_old_new_tid_map, RewriteStateData::rs_old_rel, RewriteStateData::rs_oldest_xmin, RewriteStateData::rs_unresolved_tups, and RewriteStateData::rs_use_wal.

Referenced by copy_heap_data().

251 {
253  MemoryContext rw_cxt;
254  MemoryContext old_cxt;
255  HASHCTL hash_ctl;
256 
257  /*
258  * To ease cleanup, make a separate context that will contain the
259  * RewriteState struct itself plus all subsidiary data.
260  */
262  "Table rewrite",
264  old_cxt = MemoryContextSwitchTo(rw_cxt);
265 
266  /* Create and fill in the state struct */
267  state = palloc0(sizeof(RewriteStateData));
268 
269  state->rs_old_rel = old_heap;
270  state->rs_new_rel = new_heap;
271  state->rs_buffer = (Page) palloc(BLCKSZ);
272  /* new_heap needn't be empty, just locked */
273  state->rs_blockno = RelationGetNumberOfBlocks(new_heap);
274  state->rs_buffer_valid = false;
275  state->rs_use_wal = use_wal;
276  state->rs_oldest_xmin = oldest_xmin;
277  state->rs_freeze_xid = freeze_xid;
278  state->rs_cutoff_multi = cutoff_multi;
279  state->rs_cxt = rw_cxt;
280 
281  /* Initialize hash tables used to track update chains */
282  memset(&hash_ctl, 0, sizeof(hash_ctl));
283  hash_ctl.keysize = sizeof(TidHashKey);
284  hash_ctl.entrysize = sizeof(UnresolvedTupData);
285  hash_ctl.hcxt = state->rs_cxt;
286 
287  state->rs_unresolved_tups =
288  hash_create("Rewrite / Unresolved ctids",
289  128, /* arbitrary initial size */
290  &hash_ctl,
292 
293  hash_ctl.entrysize = sizeof(OldToNewMappingData);
294 
295  state->rs_old_new_tid_map =
296  hash_create("Rewrite / Old to new tid map",
297  128, /* arbitrary initial size */
298  &hash_ctl,
300 
301  MemoryContextSwitchTo(old_cxt);
302 
304 
305  return state;
306 }
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
TransactionId rs_freeze_xid
Definition: rewriteheap.c:152
MultiXactId rs_cutoff_multi
Definition: rewriteheap.c:156
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Size entrysize
Definition: hsearch.h:73
Relation rs_new_rel
Definition: rewriteheap.c:144
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:161
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:197
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
#define AllocSetContextCreate(parent, name, allocparams)
Definition: memutils.h:165
TransactionId rs_oldest_xmin
Definition: rewriteheap.c:150
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext rs_cxt
Definition: rewriteheap.c:158
void * palloc0(Size size)
Definition: mcxt.c:864
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
Size keysize
Definition: hsearch.h:72
#define RelationGetNumberOfBlocks(reln)
Definition: bufmgr.h:199
static void logical_begin_heap_rewrite(RewriteState state)
Definition: rewriteheap.c:802
HTAB * rs_old_new_tid_map
Definition: rewriteheap.c:162
Definition: regguts.h:298
void * palloc(Size size)
Definition: mcxt.c:835
Relation rs_old_rel
Definition: rewriteheap.c:143
BlockNumber rs_blockno
Definition: rewriteheap.c:146
Pointer Page
Definition: bufpage.h:74

◆ CheckPointLogicalRewriteHeap()

void CheckPointLogicalRewriteHeap ( void  )

Definition at line 1201 of file rewriteheap.c.

References AllocateDir(), CloseTransientFile(), dirent::d_name, DEBUG1, elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FreeDir(), GetRedoRecPtr(), InvalidXLogRecPtr, LOGICAL_REWRITE_FORMAT, lstat, MAXPGPATH, OpenTransientFile(), PG_BINARY, pg_fsync(), pgstat_report_wait_end(), pgstat_report_wait_start(), ReadDir(), ReplicationSlotsComputeLogicalRestartLSN(), S_ISREG, snprintf(), stat, and WAIT_EVENT_LOGICAL_REWRITE_CHECKPOINT_SYNC.

Referenced by CheckPointGuts().

1202 {
1203  XLogRecPtr cutoff;
1204  XLogRecPtr redo;
1205  DIR *mappings_dir;
1206  struct dirent *mapping_de;
1207  char path[MAXPGPATH + 20];
1208 
1209  /*
1210  * We start of with a minimum of the last redo pointer. No new decoding
1211  * slot will start before that, so that's a safe upper bound for removal.
1212  */
1213  redo = GetRedoRecPtr();
1214 
1215  /* now check for the restart ptrs from existing slots */
1217 
1218  /* don't start earlier than the restart lsn */
1219  if (cutoff != InvalidXLogRecPtr && redo < cutoff)
1220  cutoff = redo;
1221 
1222  mappings_dir = AllocateDir("pg_logical/mappings");
1223  while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
1224  {
1225  struct stat statbuf;
1226  Oid dboid;
1227  Oid relid;
1228  XLogRecPtr lsn;
1229  TransactionId rewrite_xid;
1230  TransactionId create_xid;
1231  uint32 hi,
1232  lo;
1233 
1234  if (strcmp(mapping_de->d_name, ".") == 0 ||
1235  strcmp(mapping_de->d_name, "..") == 0)
1236  continue;
1237 
1238  snprintf(path, sizeof(path), "pg_logical/mappings/%s", mapping_de->d_name);
1239  if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
1240  continue;
1241 
1242  /* Skip over files that cannot be ours. */
1243  if (strncmp(mapping_de->d_name, "map-", 4) != 0)
1244  continue;
1245 
1246  if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
1247  &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
1248  elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
1249 
1250  lsn = ((uint64) hi) << 32 | lo;
1251 
1252  if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
1253  {
1254  elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
1255  if (unlink(path) < 0)
1256  ereport(ERROR,
1258  errmsg("could not remove file \"%s\": %m", path)));
1259  }
1260  else
1261  {
1262  int fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1263 
1264  /*
1265  * The file cannot vanish due to concurrency since this function
1266  * is the only one removing logical mappings and it's run while
1267  * CheckpointLock is held exclusively.
1268  */
1269  if (fd < 0)
1270  ereport(ERROR,
1272  errmsg("could not open file \"%s\": %m", path)));
1273 
1274  /*
1275  * We could try to avoid fsyncing files that either haven't
1276  * changed or have only been created since the checkpoint's start,
1277  * but it's currently not deemed worth the effort.
1278  */
1280  if (pg_fsync(fd) != 0)
1281  ereport(ERROR,
1283  errmsg("could not fsync file \"%s\": %m", path)));
1285  CloseTransientFile(fd);
1286  }
1287  }
1288  FreeDir(mappings_dir);
1289 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define DEBUG1
Definition: elog.h:25
uint32 TransactionId
Definition: c.h:463
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
unsigned int Oid
Definition: postgres_ext.h:31
Definition: dirent.h:9
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1069
Definition: dirent.c:25
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2403
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:598
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition: slot.c:784
unsigned int uint32
Definition: c.h:314
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2607
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1260
#define ereport(elevel, rest)
Definition: elog.h:122
#define S_ISREG(m)
Definition: win32_port.h:310
int CloseTransientFile(int fd)
Definition: fd.c:2573
#define stat(a, b)
Definition: win32_port.h:266
uint64 XLogRecPtr
Definition: xlogdefs.h:21
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2673
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1236
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:8192
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
#define lstat(path, sb)
Definition: win32_port.h:255
int errmsg(const char *fmt,...)
Definition: elog.c:797
int pg_fsync(int fd)
Definition: fd.c:348
char d_name[MAX_PATH]
Definition: dirent.h:14
#define elog
Definition: elog.h:219
int FreeDir(DIR *dir)
Definition: fd.c:2725

◆ end_heap_rewrite()

void end_heap_rewrite ( RewriteState  state)

Definition at line 314 of file rewriteheap.c.

References hash_seq_init(), hash_seq_search(), heap_sync(), ItemPointerSetInvalid, log_newpage(), logical_end_heap_rewrite(), MAIN_FORKNUM, MemoryContextDelete(), PageSetChecksumInplace(), raw_heap_insert(), RelationData::rd_node, RelationData::rd_smgr, RelationNeedsWAL, RelationOpenSmgr, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_cxt, RewriteStateData::rs_new_rel, RewriteStateData::rs_unresolved_tups, RewriteStateData::rs_use_wal, smgrextend(), HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, and UnresolvedTupData::tuple.

Referenced by copy_heap_data().

315 {
316  HASH_SEQ_STATUS seq_status;
317  UnresolvedTup unresolved;
318 
319  /*
320  * Write any remaining tuples in the UnresolvedTups table. If we have any
321  * left, they should in fact be dead, but let's err on the safe side.
322  */
323  hash_seq_init(&seq_status, state->rs_unresolved_tups);
324 
325  while ((unresolved = hash_seq_search(&seq_status)) != NULL)
326  {
327  ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
328  raw_heap_insert(state, unresolved->tuple);
329  }
330 
331  /* Write the last page, if any */
332  if (state->rs_buffer_valid)
333  {
334  if (state->rs_use_wal)
335  log_newpage(&state->rs_new_rel->rd_node,
336  MAIN_FORKNUM,
337  state->rs_blockno,
338  state->rs_buffer,
339  true);
341 
343 
345  (char *) state->rs_buffer, true);
346  }
347 
348  /*
349  * If the rel is WAL-logged, must fsync before commit. We use heap_sync
350  * to ensure that the toast table gets fsync'd too.
351  *
352  * It's obvious that we must do this when not WAL-logging. It's less
353  * obvious that we have to do it even if we did WAL-log the pages. The
354  * reason is the same as in tablecmds.c's copy_relation_data(): we're
355  * writing data that's not in shared buffers, and so a CHECKPOINT
356  * occurring during the rewriteheap operation won't have fsync'd data we
357  * wrote before the checkpoint.
358  */
359  if (RelationNeedsWAL(state->rs_new_rel))
360  heap_sync(state->rs_new_rel);
361 
363 
364  /* Deleting the context frees everything */
365  MemoryContextDelete(state->rs_cxt);
366 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:198
static void logical_end_heap_rewrite(RewriteState state)
Definition: rewriteheap.c:949
struct SMgrRelationData * rd_smgr
Definition: rel.h:87
Relation rs_new_rel
Definition: rewriteheap.c:144
void heap_sync(Relation rel)
Definition: heapam.c:9232
HeapTupleHeader t_data
Definition: htup.h:67
#define RelationOpenSmgr(relation)
Definition: rel.h:469
ItemPointerData t_ctid
Definition: htup_details.h:155
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:161
MemoryContext rs_cxt
Definition: rewriteheap.c:158
RelFileNode rd_node
Definition: rel.h:85
void PageSetChecksumInplace(Page page, BlockNumber blkno)
Definition: bufpage.c:1195
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1387
#define RelationNeedsWAL(relation)
Definition: rel.h:514
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1377
void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync)
Definition: smgr.c:600
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:150
XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, Page page, bool page_std)
Definition: xloginsert.c:972
static void raw_heap_insert(RewriteState state, HeapTuple tup)
Definition: rewriteheap.c:631
BlockNumber rs_blockno
Definition: rewriteheap.c:146

◆ heap_xlog_logical_rewrite()

void heap_xlog_logical_rewrite ( XLogReaderState r)

Definition at line 1120 of file rewriteheap.c.

References CloseTransientFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ftruncate, LOGICAL_REWRITE_FORMAT, xl_heap_rewrite_mapping::mapped_db, xl_heap_rewrite_mapping::mapped_rel, xl_heap_rewrite_mapping::mapped_xid, MAXPGPATH, xl_heap_rewrite_mapping::num_mappings, xl_heap_rewrite_mapping::offset, OpenTransientFile(), PG_BINARY, pg_fsync(), pgstat_report_wait_end(), pgstat_report_wait_start(), snprintf(), xl_heap_rewrite_mapping::start_lsn, WAIT_EVENT_LOGICAL_REWRITE_MAPPING_SYNC, WAIT_EVENT_LOGICAL_REWRITE_MAPPING_WRITE, WAIT_EVENT_LOGICAL_REWRITE_TRUNCATE, write, XLogRecGetData, and XLogRecGetXid.

Referenced by heap2_redo().

1121 {
1122  char path[MAXPGPATH];
1123  int fd;
1124  xl_heap_rewrite_mapping *xlrec;
1125  uint32 len;
1126  char *data;
1127 
1128  xlrec = (xl_heap_rewrite_mapping *) XLogRecGetData(r);
1129 
1130  snprintf(path, MAXPGPATH,
1131  "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
1132  xlrec->mapped_db, xlrec->mapped_rel,
1133  (uint32) (xlrec->start_lsn >> 32),
1134  (uint32) xlrec->start_lsn,
1135  xlrec->mapped_xid, XLogRecGetXid(r));
1136 
1137  fd = OpenTransientFile(path,
1138  O_CREAT | O_WRONLY | PG_BINARY);
1139  if (fd < 0)
1140  ereport(ERROR,
1142  errmsg("could not create file \"%s\": %m", path)));
1143 
1144  /*
1145  * Truncate all data that's not guaranteed to have been safely fsynced (by
1146  * previous record or by the last checkpoint).
1147  */
1149  if (ftruncate(fd, xlrec->offset) != 0)
1150  ereport(ERROR,
1152  errmsg("could not truncate file \"%s\" to %u: %m",
1153  path, (uint32) xlrec->offset)));
1155 
1156  /* now seek to the position we want to write our data to */
1157  if (lseek(fd, xlrec->offset, SEEK_SET) != xlrec->offset)
1158  ereport(ERROR,
1160  errmsg("could not seek to end of file \"%s\": %m",
1161  path)));
1162 
1163  data = XLogRecGetData(r) + sizeof(*xlrec);
1164 
1165  len = xlrec->num_mappings * sizeof(LogicalRewriteMappingData);
1166 
1167  /* write out tail end of mapping file (again) */
1169  if (write(fd, data, len) != len)
1170  ereport(ERROR,
1172  errmsg("could not write to file \"%s\": %m", path)));
1174 
1175  /*
1176  * Now fsync all previously written data. We could improve things and only
1177  * do this for the last write to a file, but the required bookkeeping
1178  * doesn't seem worth the trouble.
1179  */
1181  if (pg_fsync(fd) != 0)
1182  ereport(ERROR,
1184  errmsg("could not fsync file \"%s\": %m", path)));
1186 
1187  CloseTransientFile(fd);
1188 }
#define write(a, b, c)
Definition: win32.h:14
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1069
#define XLogRecGetData(decoder)
Definition: xlogreader.h:226
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2403
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:314
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1260
#define ereport(elevel, rest)
Definition: elog.h:122
int CloseTransientFile(int fd)
Definition: fd.c:2573
#define XLogRecGetXid(decoder)
Definition: xlogreader.h:224
TransactionId mapped_xid
Definition: heapam_xlog.h:356
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1236
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
int errmsg(const char *fmt,...)
Definition: elog.c:797
struct LogicalRewriteMappingData LogicalRewriteMappingData
int pg_fsync(int fd)
Definition: fd.c:348
#define ftruncate(a, b)
Definition: win32_port.h:60

◆ logical_begin_heap_rewrite()

static void logical_begin_heap_rewrite ( RewriteState  state)
static

Definition at line 802 of file rewriteheap.c.

References HASHCTL::entrysize, GetXLogInsertRecPtr(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, HASHCTL::keysize, ProcArrayGetReplicationSlotXmin(), RelationIsAccessibleInLogicalDecoding, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_cxt, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_logical_xmin, RewriteStateData::rs_num_rewrite_mappings, and RewriteStateData::rs_old_rel.

Referenced by begin_heap_rewrite().

803 {
804  HASHCTL hash_ctl;
805  TransactionId logical_xmin;
806 
807  /*
808  * We only need to persist these mappings if the rewritten table can be
809  * accessed during logical decoding, if not, we can skip doing any
810  * additional work.
811  */
812  state->rs_logical_rewrite =
814 
815  if (!state->rs_logical_rewrite)
816  return;
817 
818  ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin);
819 
820  /*
821  * If there are no logical slots in progress we don't need to do anything,
822  * there cannot be any remappings for relevant rows yet. The relation's
823  * lock protects us against races.
824  */
825  if (logical_xmin == InvalidTransactionId)
826  {
827  state->rs_logical_rewrite = false;
828  return;
829  }
830 
831  state->rs_logical_xmin = logical_xmin;
833  state->rs_num_rewrite_mappings = 0;
834 
835  memset(&hash_ctl, 0, sizeof(hash_ctl));
836  hash_ctl.keysize = sizeof(TransactionId);
837  hash_ctl.entrysize = sizeof(RewriteMappingFile);
838  hash_ctl.hcxt = state->rs_cxt;
839 
840  state->rs_logical_mappings =
841  hash_create("Logical rewrite mapping",
842  128, /* arbitrary initial size */
843  &hash_ctl,
845 }
TransactionId rs_logical_xmin
Definition: rewriteheap.c:154
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
uint32 TransactionId
Definition: c.h:463
MemoryContext hcxt
Definition: hsearch.h:78
Size entrysize
Definition: hsearch.h:73
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:2986
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:164
XLogRecPtr GetXLogInsertRecPtr(void)
Definition: xlog.c:11174
#define InvalidTransactionId
Definition: transam.h:31
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:160
HTAB * rs_logical_mappings
Definition: rewriteheap.c:163
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext rs_cxt
Definition: rewriteheap.c:158
#define RelationIsAccessibleInLogicalDecoding(relation)
Definition: rel.h:568
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
Size keysize
Definition: hsearch.h:72
struct RewriteMappingFile RewriteMappingFile
Relation rs_old_rel
Definition: rewriteheap.c:143

◆ logical_end_heap_rewrite()

static void logical_end_heap_rewrite ( RewriteState  state)
static

Definition at line 949 of file rewriteheap.c.

References ereport, errcode_for_file_access(), errmsg(), ERROR, FileClose(), FileSync(), hash_seq_init(), hash_seq_search(), logical_heap_rewrite_flush_mappings(), RewriteMappingFile::path, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_num_rewrite_mappings, RewriteMappingFile::vfd, and WAIT_EVENT_LOGICAL_REWRITE_SYNC.

Referenced by end_heap_rewrite().

950 {
951  HASH_SEQ_STATUS seq_status;
952  RewriteMappingFile *src;
953 
954  /* done, no logical rewrite in progress */
955  if (!state->rs_logical_rewrite)
956  return;
957 
958  /* writeout remaining in-memory entries */
959  if (state->rs_num_rewrite_mappings > 0)
961 
962  /* Iterate over all mappings we have written and fsync the files. */
963  hash_seq_init(&seq_status, state->rs_logical_mappings);
964  while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
965  {
967  ereport(ERROR,
969  errmsg("could not fsync file \"%s\": %m", src->path)));
970  FileClose(src->vfd);
971  }
972  /* memory context cleanup will deal with the rest */
973 }
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:164
#define ERROR
Definition: elog.h:43
char path[MAXPGPATH]
Definition: rewriteheap.c:210
int FileSync(File file, uint32 wait_event_info)
Definition: fd.c:2079
int errcode_for_file_access(void)
Definition: elog.c:598
#define ereport(elevel, rest)
Definition: elog.h:122
HTAB * rs_logical_mappings
Definition: rewriteheap.c:163
void FileClose(File file)
Definition: fd.c:1749
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1387
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1377
static void logical_heap_rewrite_flush_mappings(RewriteState state)
Definition: rewriteheap.c:851
int errmsg(const char *fmt,...)
Definition: elog.c:797

◆ logical_heap_rewrite_flush_mappings()

static void logical_heap_rewrite_flush_mappings ( RewriteState  state)
static

Definition at line 851 of file rewriteheap.c.

References Assert, dlist_mutable_iter::cur, DEBUG1, dlist_container, dlist_delete(), dlist_foreach_modify, elog, ereport, errcode_for_file_access(), errmsg(), ERROR, FileWrite(), hash_seq_init(), hash_seq_search(), InvalidOid, RewriteMappingDataEntry::map, xl_heap_rewrite_mapping::mapped_db, xl_heap_rewrite_mapping::mapped_rel, xl_heap_rewrite_mapping::mapped_xid, RewriteMappingFile::mappings, MyDatabaseId, RewriteMappingDataEntry::node, RewriteMappingFile::num_mappings, xl_heap_rewrite_mapping::num_mappings, RewriteMappingFile::off, xl_heap_rewrite_mapping::offset, palloc(), RewriteMappingFile::path, pfree(), RelationData::rd_rel, RelationGetRelid, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_num_rewrite_mappings, RewriteStateData::rs_old_rel, xl_heap_rewrite_mapping::start_lsn, RewriteMappingFile::vfd, WAIT_EVENT_LOGICAL_REWRITE_WRITE, RewriteMappingFile::xid, XLOG_HEAP2_REWRITE, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by logical_end_heap_rewrite(), and logical_rewrite_log_mapping().

852 {
853  HASH_SEQ_STATUS seq_status;
854  RewriteMappingFile *src;
855  dlist_mutable_iter iter;
856 
857  Assert(state->rs_logical_rewrite);
858 
859  /* no logical rewrite in progress, no need to iterate over mappings */
860  if (state->rs_num_rewrite_mappings == 0)
861  return;
862 
863  elog(DEBUG1, "flushing %u logical rewrite mapping entries",
864  state->rs_num_rewrite_mappings);
865 
866  hash_seq_init(&seq_status, state->rs_logical_mappings);
867  while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
868  {
869  char *waldata;
870  char *waldata_start;
872  Oid dboid;
873  uint32 len;
874  int written;
875 
876  /* this file hasn't got any new mappings */
877  if (src->num_mappings == 0)
878  continue;
879 
880  if (state->rs_old_rel->rd_rel->relisshared)
881  dboid = InvalidOid;
882  else
883  dboid = MyDatabaseId;
884 
885  xlrec.num_mappings = src->num_mappings;
886  xlrec.mapped_rel = RelationGetRelid(state->rs_old_rel);
887  xlrec.mapped_xid = src->xid;
888  xlrec.mapped_db = dboid;
889  xlrec.offset = src->off;
890  xlrec.start_lsn = state->rs_begin_lsn;
891 
892  /* write all mappings consecutively */
893  len = src->num_mappings * sizeof(LogicalRewriteMappingData);
894  waldata_start = waldata = palloc(len);
895 
896  /*
897  * collect data we need to write out, but don't modify ondisk data yet
898  */
899  dlist_foreach_modify(iter, &src->mappings)
900  {
902 
903  pmap = dlist_container(RewriteMappingDataEntry, node, iter.cur);
904 
905  memcpy(waldata, &pmap->map, sizeof(pmap->map));
906  waldata += sizeof(pmap->map);
907 
908  /* remove from the list and free */
909  dlist_delete(&pmap->node);
910  pfree(pmap);
911 
912  /* update bookkeeping */
913  state->rs_num_rewrite_mappings--;
914  src->num_mappings--;
915  }
916 
917  Assert(src->num_mappings == 0);
918  Assert(waldata == waldata_start + len);
919 
920  /*
921  * Note that we deviate from the usual WAL coding practices here,
922  * check the above "Logical rewrite support" comment for reasoning.
923  */
924  written = FileWrite(src->vfd, waldata_start, len,
926  if (written != len)
927  ereport(ERROR,
929  errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path,
930  written, len)));
931  src->off += len;
932 
933  XLogBeginInsert();
934  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
935  XLogRegisterData(waldata_start, len);
936 
937  /* write xlog record */
938  XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_REWRITE);
939 
940  pfree(waldata_start);
941  }
942  Assert(state->rs_num_rewrite_mappings == 0);
943 }
#define DEBUG1
Definition: elog.h:25
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define XLOG_HEAP2_REWRITE
Definition: heapam_xlog.h:53
Form_pg_class rd_rel
Definition: rel.h:114
unsigned int Oid
Definition: postgres_ext.h:31
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:936
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:164
#define ERROR
Definition: elog.h:43
char path[MAXPGPATH]
Definition: rewriteheap.c:210
TransactionId xid
Definition: rewriteheap.c:205
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:314
int FileWrite(File file, char *buffer, int amount, uint32 wait_event_info)
Definition: fd.c:1958
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define ereport(elevel, rest)
Definition: elog.h:122
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:160
HTAB * rs_logical_mappings
Definition: rewriteheap.c:163
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:323
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:415
Oid MyDatabaseId
Definition: globals.c:77
#define InvalidOid
Definition: postgres_ext.h:36
TransactionId mapped_xid
Definition: heapam_xlog.h:356
#define Assert(condition)
Definition: c.h:688
dlist_head mappings
Definition: rewriteheap.c:209
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1387
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1377
void * palloc(Size size)
Definition: mcxt.c:835
int errmsg(const char *fmt,...)
Definition: elog.c:797
LogicalRewriteMappingData map
Definition: rewriteheap.c:219
struct LogicalRewriteMappingData LogicalRewriteMappingData
Relation rs_old_rel
Definition: rewriteheap.c:143
#define elog
Definition: elog.h:219
void XLogBeginInsert(void)
Definition: xloginsert.c:120
#define RelationGetRelid(relation)
Definition: rel.h:425

◆ logical_rewrite_heap_tuple()

static void logical_rewrite_heap_tuple ( RewriteState  state,
ItemPointerData  old_tid,
HeapTuple  new_tuple 
)
static

Definition at line 1046 of file rewriteheap.c.

References HEAP_XMAX_IS_LOCKED_ONLY, HeapTupleHeaderGetUpdateXid, HeapTupleHeaderGetXmin, logical_rewrite_log_mapping(), LogicalRewriteMappingData::new_node, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_node, LogicalRewriteMappingData::old_tid, RelationData::rd_node, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_logical_xmin, RewriteStateData::rs_new_rel, RewriteStateData::rs_old_rel, HeapTupleData::t_data, HeapTupleHeaderData::t_infomask, HeapTupleData::t_self, TransactionIdEquals, TransactionIdIsNormal, and TransactionIdPrecedes().

Referenced by rewrite_heap_tuple().

1048 {
1049  ItemPointerData new_tid = new_tuple->t_self;
1050  TransactionId cutoff = state->rs_logical_xmin;
1051  TransactionId xmin;
1052  TransactionId xmax;
1053  bool do_log_xmin = false;
1054  bool do_log_xmax = false;
1056 
1057  /* no logical rewrite in progress, we don't need to log anything */
1058  if (!state->rs_logical_rewrite)
1059  return;
1060 
1061  xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
1062  /* use *GetUpdateXid to correctly deal with multixacts */
1063  xmax = HeapTupleHeaderGetUpdateXid(new_tuple->t_data);
1064 
1065  /*
1066  * Log the mapping iff the tuple has been created recently.
1067  */
1068  if (TransactionIdIsNormal(xmin) && !TransactionIdPrecedes(xmin, cutoff))
1069  do_log_xmin = true;
1070 
1071  if (!TransactionIdIsNormal(xmax))
1072  {
1073  /*
1074  * no xmax is set, can't have any permanent ones, so this check is
1075  * sufficient
1076  */
1077  }
1078  else if (HEAP_XMAX_IS_LOCKED_ONLY(new_tuple->t_data->t_infomask))
1079  {
1080  /* only locked, we don't care */
1081  }
1082  else if (!TransactionIdPrecedes(xmax, cutoff))
1083  {
1084  /* tuple has been deleted recently, log */
1085  do_log_xmax = true;
1086  }
1087 
1088  /* if neither needs to be logged, we're done */
1089  if (!do_log_xmin && !do_log_xmax)
1090  return;
1091 
1092  /* fill out mapping information */
1093  map.old_node = state->rs_old_rel->rd_node;
1094  map.old_tid = old_tid;
1095  map.new_node = state->rs_new_rel->rd_node;
1096  map.new_tid = new_tid;
1097 
1098  /* ---
1099  * Now persist the mapping for the individual xids that are affected. We
1100  * need to log for both xmin and xmax if they aren't the same transaction
1101  * since the mapping files are per "affected" xid.
1102  * We don't muster all that much effort detecting whether xmin and xmax
1103  * are actually the same transaction, we just check whether the xid is the
1104  * same disregarding subtransactions. Logging too much is relatively
1105  * harmless and we could never do the check fully since subtransaction
1106  * data is thrown away during restarts.
1107  * ---
1108  */
1109  if (do_log_xmin)
1110  logical_rewrite_log_mapping(state, xmin, &map);
1111  /* separately log mapping for xmax unless it'd be redundant */
1112  if (do_log_xmax && !TransactionIdEquals(xmin, xmax))
1113  logical_rewrite_log_mapping(state, xmax, &map);
1114 }
#define HeapTupleHeaderGetUpdateXid(tup)
Definition: htup_details.h:364
TransactionId rs_logical_xmin
Definition: rewriteheap.c:154
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
uint32 TransactionId
Definition: c.h:463
Relation rs_new_rel
Definition: rewriteheap.c:144
HeapTupleHeader t_data
Definition: htup.h:67
ItemPointerData t_self
Definition: htup.h:65
static void logical_rewrite_log_mapping(RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
Definition: rewriteheap.c:979
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define HEAP_XMAX_IS_LOCKED_ONLY(infomask)
Definition: htup_details.h:221
RelFileNode rd_node
Definition: rel.h:85
ItemPointerData new_tid
Definition: rewriteheap.h:40
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:312
Relation rs_old_rel
Definition: rewriteheap.c:143
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
ItemPointerData old_tid
Definition: rewriteheap.h:39

◆ logical_rewrite_log_mapping()

static void logical_rewrite_log_mapping ( RewriteState  state,
TransactionId  xid,
LogicalRewriteMappingData map 
)
static

Definition at line 979 of file rewriteheap.c.

References dlist_init(), dlist_push_tail(), ereport, errcode_for_file_access(), errmsg(), ERROR, GetCurrentTransactionId(), HASH_ENTER, hash_search(), InvalidOid, logical_heap_rewrite_flush_mappings(), LOGICAL_REWRITE_FORMAT, RewriteMappingDataEntry::map, RewriteMappingFile::mappings, MAXPGPATH, MemoryContextAlloc(), MyDatabaseId, RewriteMappingDataEntry::node, RewriteMappingFile::num_mappings, RewriteMappingFile::off, RewriteMappingFile::path, PathNameOpenFile(), PG_BINARY, RelationData::rd_rel, RelationGetRelid, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_cxt, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_num_rewrite_mappings, RewriteStateData::rs_old_rel, snprintf(), and RewriteMappingFile::vfd.

Referenced by logical_rewrite_heap_tuple().

981 {
982  RewriteMappingFile *src;
984  Oid relid;
985  bool found;
986 
987  relid = RelationGetRelid(state->rs_old_rel);
988 
989  /* look for existing mappings for this 'mapped' xid */
990  src = hash_search(state->rs_logical_mappings, &xid,
991  HASH_ENTER, &found);
992 
993  /*
994  * We haven't yet had the need to map anything for this xid, create
995  * per-xid data structures.
996  */
997  if (!found)
998  {
999  char path[MAXPGPATH];
1000  Oid dboid;
1001 
1002  if (state->rs_old_rel->rd_rel->relisshared)
1003  dboid = InvalidOid;
1004  else
1005  dboid = MyDatabaseId;
1006 
1007  snprintf(path, MAXPGPATH,
1008  "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
1009  dboid, relid,
1010  (uint32) (state->rs_begin_lsn >> 32),
1011  (uint32) state->rs_begin_lsn,
1012  xid, GetCurrentTransactionId());
1013 
1014  dlist_init(&src->mappings);
1015  src->num_mappings = 0;
1016  src->off = 0;
1017  memcpy(src->path, path, sizeof(path));
1018  src->vfd = PathNameOpenFile(path,
1019  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1020  if (src->vfd < 0)
1021  ereport(ERROR,
1023  errmsg("could not create file \"%s\": %m", path)));
1024  }
1025 
1026  pmap = MemoryContextAlloc(state->rs_cxt,
1027  sizeof(RewriteMappingDataEntry));
1028  memcpy(&pmap->map, map, sizeof(LogicalRewriteMappingData));
1029  dlist_push_tail(&src->mappings, &pmap->node);
1030  src->num_mappings++;
1031  state->rs_num_rewrite_mappings++;
1032 
1033  /*
1034  * Write out buffer every time we've too many in-memory entries across all
1035  * mapping files.
1036  */
1037  if (state->rs_num_rewrite_mappings >= 1000 /* arbitrary number */ )
1039 }
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1358
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:904
Form_pg_class rd_rel
Definition: rel.h:114
unsigned int Oid
Definition: postgres_ext.h:31
#define PG_BINARY
Definition: c.h:1069
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:164
#define ERROR
Definition: elog.h:43
char path[MAXPGPATH]
Definition: rewriteheap.c:210
#define MAXPGPATH
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:418
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:314
#define ereport(elevel, rest)
Definition: elog.h:122
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:160
HTAB * rs_logical_mappings
Definition: rewriteheap.c:163
MemoryContext rs_cxt
Definition: rewriteheap.c:158
Oid MyDatabaseId
Definition: globals.c:77
#define InvalidOid
Definition: postgres_ext.h:36
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
dlist_head mappings
Definition: rewriteheap.c:209
static void logical_heap_rewrite_flush_mappings(RewriteState state)
Definition: rewriteheap.c:851
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
int errmsg(const char *fmt,...)
Definition: elog.c:797
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:693
LogicalRewriteMappingData map
Definition: rewriteheap.c:219
Relation rs_old_rel
Definition: rewriteheap.c:143
#define RelationGetRelid(relation)
Definition: rel.h:425

◆ raw_heap_insert()

static void raw_heap_insert ( RewriteState  state,
HeapTuple  tup 
)
static

Definition at line 631 of file rewriteheap.c.

References Assert, elog, ereport, errcode(), errmsg(), ERROR, HEAP_DEFAULT_FILLFACTOR, heap_freetuple(), HEAP_INSERT_SKIP_FSM, HEAP_INSERT_SKIP_WAL, HeapTupleHasExternal, InvalidOffsetNumber, ItemPointerIsValid, ItemPointerSet, log_newpage(), MAIN_FORKNUM, MAXALIGN, MaxHeapTupleSize, PageAddItem, PageGetHeapFreeSpace(), PageGetItem, PageGetItemId, PageInit(), PageSetChecksumInplace(), RelationData::rd_node, RelationData::rd_rel, RelationData::rd_smgr, RelationGetTargetPageFreeSpace, RelationOpenSmgr, RELKIND_TOASTVALUE, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_new_rel, RewriteStateData::rs_use_wal, smgrextend(), HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, HeapTupleData::t_len, HeapTupleData::t_self, toast_insert_or_update(), and TOAST_TUPLE_THRESHOLD.

Referenced by end_heap_rewrite(), and rewrite_heap_tuple().

632 {
633  Page page = state->rs_buffer;
634  Size pageFreeSpace,
635  saveFreeSpace;
636  Size len;
637  OffsetNumber newoff;
638  HeapTuple heaptup;
639 
640  /*
641  * If the new tuple is too big for storage or contains already toasted
642  * out-of-line attributes from some other relation, invoke the toaster.
643  *
644  * Note: below this point, heaptup is the data we actually intend to store
645  * into the relation; tup is the caller's original untoasted data.
646  */
647  if (state->rs_new_rel->rd_rel->relkind == RELKIND_TOASTVALUE)
648  {
649  /* toast table entries should never be recursively toasted */
651  heaptup = tup;
652  }
653  else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
654  heaptup = toast_insert_or_update(state->rs_new_rel, tup, NULL,
656  (state->rs_use_wal ?
657  0 : HEAP_INSERT_SKIP_WAL));
658  else
659  heaptup = tup;
660 
661  len = MAXALIGN(heaptup->t_len); /* be conservative */
662 
663  /*
664  * If we're gonna fail for oversize tuple, do it right away
665  */
666  if (len > MaxHeapTupleSize)
667  ereport(ERROR,
668  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
669  errmsg("row is too big: size %zu, maximum size %zu",
670  len, MaxHeapTupleSize)));
671 
672  /* Compute desired extra freespace due to fillfactor option */
673  saveFreeSpace = RelationGetTargetPageFreeSpace(state->rs_new_rel,
675 
676  /* Now we can check to see if there's enough free space already. */
677  if (state->rs_buffer_valid)
678  {
679  pageFreeSpace = PageGetHeapFreeSpace(page);
680 
681  if (len + saveFreeSpace > pageFreeSpace)
682  {
683  /* Doesn't fit, so write out the existing page */
684 
685  /* XLOG stuff */
686  if (state->rs_use_wal)
687  log_newpage(&state->rs_new_rel->rd_node,
688  MAIN_FORKNUM,
689  state->rs_blockno,
690  page,
691  true);
692 
693  /*
694  * Now write the page. We say isTemp = true even if it's not a
695  * temp table, because there's no need for smgr to schedule an
696  * fsync for this write; we'll do it ourselves in
697  * end_heap_rewrite.
698  */
700 
701  PageSetChecksumInplace(page, state->rs_blockno);
702 
704  state->rs_blockno, (char *) page, true);
705 
706  state->rs_blockno++;
707  state->rs_buffer_valid = false;
708  }
709  }
710 
711  if (!state->rs_buffer_valid)
712  {
713  /* Initialize a new empty page */
714  PageInit(page, BLCKSZ, 0);
715  state->rs_buffer_valid = true;
716  }
717 
718  /* And now we can insert the tuple into the page */
719  newoff = PageAddItem(page, (Item) heaptup->t_data, heaptup->t_len,
720  InvalidOffsetNumber, false, true);
721  if (newoff == InvalidOffsetNumber)
722  elog(ERROR, "failed to add tuple");
723 
724  /* Update caller's t_self to the actual position where it was stored */
725  ItemPointerSet(&(tup->t_self), state->rs_blockno, newoff);
726 
727  /*
728  * Insert the correct position into CTID of the stored tuple, too, if the
729  * caller didn't supply a valid CTID.
730  */
731  if (!ItemPointerIsValid(&tup->t_data->t_ctid))
732  {
733  ItemId newitemid;
734  HeapTupleHeader onpage_tup;
735 
736  newitemid = PageGetItemId(page, newoff);
737  onpage_tup = (HeapTupleHeader) PageGetItem(page, newitemid);
738 
739  onpage_tup->t_ctid = tup->t_self;
740  }
741 
742  /* If heaptup is a private copy, release it. */
743  if (heaptup != tup)
744  heap_freetuple(heaptup);
745 }
#define ItemPointerIsValid(pointer)
Definition: itemptr.h:60
#define TOAST_TUPLE_THRESHOLD
Definition: tuptoaster.h:55
HeapTuple toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, int options)
Definition: tuptoaster.c:534
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
struct SMgrRelationData * rd_smgr
Definition: rel.h:87
Pointer Item
Definition: item.h:17
Relation rs_new_rel
Definition: rewriteheap.c:144
int errcode(int sqlerrcode)
Definition: elog.c:575
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:412
#define HEAP_INSERT_SKIP_WAL
Definition: heapam.h:28
Form_pg_class rd_rel
Definition: rel.h:114
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1373
uint16 OffsetNumber
Definition: off.h:24
HeapTupleHeader t_data
Definition: htup.h:67
#define RelationOpenSmgr(relation)
Definition: rel.h:469
#define ERROR
Definition: elog.h:43
Size PageGetHeapFreeSpace(Page page)
Definition: bufpage.c:662
ItemPointerData t_ctid
Definition: htup_details.h:155
ItemPointerData t_self
Definition: htup.h:65
uint32 t_len
Definition: htup.h:64
#define MaxHeapTupleSize
Definition: htup_details.h:566
#define ereport(elevel, rest)
Definition: elog.h:122
#define RelationGetTargetPageFreeSpace(relation, defaultff)
Definition: rel.h:316
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:231
#define RELKIND_TOASTVALUE
Definition: pg_class.h:163
#define InvalidOffsetNumber
Definition: off.h:26
RelFileNode rd_node
Definition: rel.h:85
#define Assert(condition)
Definition: c.h:688
size_t Size
Definition: c.h:422
void PageSetChecksumInplace(Page page, BlockNumber blkno)
Definition: bufpage.c:1195
#define MAXALIGN(LEN)
Definition: c.h:641
void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync)
Definition: smgr.c:600
#define HEAP_INSERT_SKIP_FSM
Definition: heapam.h:29
#define HeapTupleHasExternal(tuple)
Definition: htup_details.h:679
int errmsg(const char *fmt,...)
Definition: elog.c:797
XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, Page page, bool page_std)
Definition: xloginsert.c:972
#define elog
Definition: elog.h:219
#define HEAP_DEFAULT_FILLFACTOR
Definition: rel.h:287
BlockNumber rs_blockno
Definition: rewriteheap.c:146
#define PageGetItem(page, itemId)
Definition: bufpage.h:336
Pointer Page
Definition: bufpage.h:74
#define ItemPointerSet(pointer, blockNumber, offNum)
Definition: itemptr.h:105
void PageInit(Page page, Size pageSize, Size specialSize)
Definition: bufpage.c:41

◆ rewrite_heap_dead_tuple()

bool rewrite_heap_dead_tuple ( RewriteState  state,
HeapTuple  old_tuple 
)

Definition at line 581 of file rewriteheap.c.

References Assert, HASH_FIND, HASH_REMOVE, hash_search(), heap_freetuple(), HeapTupleHeaderGetXmin, RewriteStateData::rs_unresolved_tups, HeapTupleData::t_data, HeapTupleData::t_self, TidHashKey::tid, UnresolvedTupData::tuple, and TidHashKey::xmin.

Referenced by copy_heap_data().

582 {
583  /*
584  * If we have already seen an earlier tuple in the update chain that
585  * points to this tuple, let's forget about that earlier tuple. It's in
586  * fact dead as well, our simple xmax < OldestXmin test in
587  * HeapTupleSatisfiesVacuum just wasn't enough to detect it. It happens
588  * when xmin of a tuple is greater than xmax, which sounds
589  * counter-intuitive but is perfectly valid.
590  *
591  * We don't bother to try to detect the situation the other way round,
592  * when we encounter the dead tuple first and then the recently dead one
593  * that points to it. If that happens, we'll have some unmatched entries
594  * in the UnresolvedTups hash table at the end. That can happen anyway,
595  * because a vacuum might have removed the dead tuple in the chain before
596  * us.
597  */
598  UnresolvedTup unresolved;
599  TidHashKey hashkey;
600  bool found;
601 
602  memset(&hashkey, 0, sizeof(hashkey));
603  hashkey.xmin = HeapTupleHeaderGetXmin(old_tuple->t_data);
604  hashkey.tid = old_tuple->t_self;
605 
606  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
607  HASH_FIND, NULL);
608 
609  if (unresolved != NULL)
610  {
611  /* Need to free the contained tuple as well as the hashtable entry */
612  heap_freetuple(unresolved->tuple);
613  hash_search(state->rs_unresolved_tups, &hashkey,
614  HASH_REMOVE, &found);
615  Assert(found);
616  return true;
617  }
618 
619  return false;
620 }
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:904
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1373
HeapTupleHeader t_data
Definition: htup.h:67
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:161
ItemPointerData t_self
Definition: htup.h:65
ItemPointerData tid
Definition: rewriteheap.c:176
#define Assert(condition)
Definition: c.h:688
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:312
TransactionId xmin
Definition: rewriteheap.c:175

◆ rewrite_heap_tuple()

void rewrite_heap_tuple ( RewriteState  state,
HeapTuple  old_tuple,
HeapTuple  new_tuple 
)

Definition at line 380 of file rewriteheap.c.

References Assert, HASH_ENTER, HASH_FIND, HASH_REMOVE, hash_search(), HEAP2_XACT_MASK, heap_copytuple(), heap_freetuple(), heap_freeze_tuple(), HEAP_UPDATED, HEAP_XACT_MASK, HEAP_XMAX_INVALID, HeapTupleHeaderGetUpdateXid, HeapTupleHeaderGetXmin, HeapTupleHeaderIsOnlyLocked(), ItemPointerEquals(), ItemPointerSetInvalid, logical_rewrite_heap_tuple(), MemoryContextSwitchTo(), OldToNewMappingData::new_tid, UnresolvedTupData::old_tid, raw_heap_insert(), RelationData::rd_rel, RewriteStateData::rs_cutoff_multi, RewriteStateData::rs_cxt, RewriteStateData::rs_freeze_xid, RewriteStateData::rs_old_new_tid_map, RewriteStateData::rs_old_rel, RewriteStateData::rs_oldest_xmin, RewriteStateData::rs_unresolved_tups, HeapTupleHeaderData::t_choice, HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, HeapTupleHeaderData::t_heap, HeapTupleHeaderData::t_infomask, HeapTupleHeaderData::t_infomask2, HeapTupleData::t_self, TidHashKey::tid, TransactionIdPrecedes(), UnresolvedTupData::tuple, and TidHashKey::xmin.

Referenced by reform_and_rewrite_tuple().

382 {
383  MemoryContext old_cxt;
384  ItemPointerData old_tid;
385  TidHashKey hashkey;
386  bool found;
387  bool free_new;
388 
389  old_cxt = MemoryContextSwitchTo(state->rs_cxt);
390 
391  /*
392  * Copy the original tuple's visibility information into new_tuple.
393  *
394  * XXX we might later need to copy some t_infomask2 bits, too? Right now,
395  * we intentionally clear the HOT status bits.
396  */
397  memcpy(&new_tuple->t_data->t_choice.t_heap,
398  &old_tuple->t_data->t_choice.t_heap,
399  sizeof(HeapTupleFields));
400 
401  new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
402  new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
403  new_tuple->t_data->t_infomask |=
404  old_tuple->t_data->t_infomask & HEAP_XACT_MASK;
405 
406  /*
407  * While we have our hands on the tuple, we may as well freeze any
408  * eligible xmin or xmax, so that future VACUUM effort can be saved.
409  */
410  heap_freeze_tuple(new_tuple->t_data,
411  state->rs_old_rel->rd_rel->relfrozenxid,
412  state->rs_old_rel->rd_rel->relminmxid,
413  state->rs_freeze_xid,
414  state->rs_cutoff_multi);
415 
416  /*
417  * Invalid ctid means that ctid should point to the tuple itself. We'll
418  * override it later if the tuple is part of an update chain.
419  */
420  ItemPointerSetInvalid(&new_tuple->t_data->t_ctid);
421 
422  /*
423  * If the tuple has been updated, check the old-to-new mapping hash table.
424  */
425  if (!((old_tuple->t_data->t_infomask & HEAP_XMAX_INVALID) ||
426  HeapTupleHeaderIsOnlyLocked(old_tuple->t_data)) &&
427  !(ItemPointerEquals(&(old_tuple->t_self),
428  &(old_tuple->t_data->t_ctid))))
429  {
430  OldToNewMapping mapping;
431 
432  memset(&hashkey, 0, sizeof(hashkey));
433  hashkey.xmin = HeapTupleHeaderGetUpdateXid(old_tuple->t_data);
434  hashkey.tid = old_tuple->t_data->t_ctid;
435 
436  mapping = (OldToNewMapping)
437  hash_search(state->rs_old_new_tid_map, &hashkey,
438  HASH_FIND, NULL);
439 
440  if (mapping != NULL)
441  {
442  /*
443  * We've already copied the tuple that t_ctid points to, so we can
444  * set the ctid of this tuple to point to the new location, and
445  * insert it right away.
446  */
447  new_tuple->t_data->t_ctid = mapping->new_tid;
448 
449  /* We don't need the mapping entry anymore */
450  hash_search(state->rs_old_new_tid_map, &hashkey,
451  HASH_REMOVE, &found);
452  Assert(found);
453  }
454  else
455  {
456  /*
457  * We haven't seen the tuple t_ctid points to yet. Stash this
458  * tuple into unresolved_tups to be written later.
459  */
460  UnresolvedTup unresolved;
461 
462  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
463  HASH_ENTER, &found);
464  Assert(!found);
465 
466  unresolved->old_tid = old_tuple->t_self;
467  unresolved->tuple = heap_copytuple(new_tuple);
468 
469  /*
470  * We can't do anything more now, since we don't know where the
471  * tuple will be written.
472  */
473  MemoryContextSwitchTo(old_cxt);
474  return;
475  }
476  }
477 
478  /*
479  * Now we will write the tuple, and then check to see if it is the B tuple
480  * in any new or known pair. When we resolve a known pair, we will be
481  * able to write that pair's A tuple, and then we have to check if it
482  * resolves some other pair. Hence, we need a loop here.
483  */
484  old_tid = old_tuple->t_self;
485  free_new = false;
486 
487  for (;;)
488  {
489  ItemPointerData new_tid;
490 
491  /* Insert the tuple and find out where it's put in new_heap */
492  raw_heap_insert(state, new_tuple);
493  new_tid = new_tuple->t_self;
494 
495  logical_rewrite_heap_tuple(state, old_tid, new_tuple);
496 
497  /*
498  * If the tuple is the updated version of a row, and the prior version
499  * wouldn't be DEAD yet, then we need to either resolve the prior
500  * version (if it's waiting in rs_unresolved_tups), or make an entry
501  * in rs_old_new_tid_map (so we can resolve it when we do see it). The
502  * previous tuple's xmax would equal this one's xmin, so it's
503  * RECENTLY_DEAD if and only if the xmin is not before OldestXmin.
504  */
505  if ((new_tuple->t_data->t_infomask & HEAP_UPDATED) &&
507  state->rs_oldest_xmin))
508  {
509  /*
510  * Okay, this is B in an update pair. See if we've seen A.
511  */
512  UnresolvedTup unresolved;
513 
514  memset(&hashkey, 0, sizeof(hashkey));
515  hashkey.xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
516  hashkey.tid = old_tid;
517 
518  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
519  HASH_FIND, NULL);
520 
521  if (unresolved != NULL)
522  {
523  /*
524  * We have seen and memorized the previous tuple already. Now
525  * that we know where we inserted the tuple its t_ctid points
526  * to, fix its t_ctid and insert it to the new heap.
527  */
528  if (free_new)
529  heap_freetuple(new_tuple);
530  new_tuple = unresolved->tuple;
531  free_new = true;
532  old_tid = unresolved->old_tid;
533  new_tuple->t_data->t_ctid = new_tid;
534 
535  /*
536  * We don't need the hash entry anymore, but don't free its
537  * tuple just yet.
538  */
539  hash_search(state->rs_unresolved_tups, &hashkey,
540  HASH_REMOVE, &found);
541  Assert(found);
542 
543  /* loop back to insert the previous tuple in the chain */
544  continue;
545  }
546  else
547  {
548  /*
549  * Remember the new tid of this tuple. We'll use it to set the
550  * ctid when we find the previous tuple in the chain.
551  */
552  OldToNewMapping mapping;
553 
554  mapping = hash_search(state->rs_old_new_tid_map, &hashkey,
555  HASH_ENTER, &found);
556  Assert(!found);
557 
558  mapping->new_tid = new_tid;
559  }
560  }
561 
562  /* Done with this (chain of) tuples, for now */
563  if (free_new)
564  heap_freetuple(new_tuple);
565  break;
566  }
567 
568  MemoryContextSwitchTo(old_cxt);
569 }
#define HeapTupleHeaderGetUpdateXid(tup)
Definition: htup_details.h:364
bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple)
Definition: tqual.c:1596
HeapTuple heap_copytuple(HeapTuple tuple)
Definition: heaptuple.c:611
union HeapTupleHeaderData::@45 t_choice
HeapTupleFields t_heap
Definition: htup_details.h:151
static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
Definition: rewriteheap.c:1046
TransactionId rs_freeze_xid
Definition: rewriteheap.c:152
MultiXactId rs_cutoff_multi
Definition: rewriteheap.c:156
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define HEAP2_XACT_MASK
Definition: htup_details.h:274
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:904
#define HEAP_UPDATED
Definition: htup_details.h:200
ItemPointerData old_tid
Definition: rewriteheap.c:185
Form_pg_class rd_rel
Definition: rel.h:114
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1373
HeapTupleHeader t_data
Definition: htup.h:67
ItemPointerData new_tid
Definition: rewriteheap.c:194
#define HEAP_XMAX_INVALID
Definition: htup_details.h:198
ItemPointerData t_ctid
Definition: htup_details.h:155
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:161
ItemPointerData t_self
Definition: htup.h:65
bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId relfrozenxid, TransactionId relminmxid, TransactionId cutoff_xid, TransactionId cutoff_multi)
Definition: heapam.c:6914
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
TransactionId rs_oldest_xmin
Definition: rewriteheap.c:150
MemoryContext rs_cxt
Definition: rewriteheap.c:158
ItemPointerData tid
Definition: rewriteheap.c:176
#define Assert(condition)
Definition: c.h:688
HTAB * rs_old_new_tid_map
Definition: rewriteheap.c:162
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:312
bool ItemPointerEquals(ItemPointer pointer1, ItemPointer pointer2)
Definition: itemptr.c:29
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:150
TransactionId xmin
Definition: rewriteheap.c:175
static void raw_heap_insert(RewriteState state, HeapTuple tup)
Definition: rewriteheap.c:631
#define HEAP_XACT_MASK
Definition: htup_details.h:209
Relation rs_old_rel
Definition: rewriteheap.c:143
OldToNewMappingData * OldToNewMapping
Definition: rewriteheap.c:197