PostgreSQL Source Code  git master
rewriteheap.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <unistd.h>
#include "access/heapam.h"
#include "access/heapam_xlog.h"
#include "access/heaptoast.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xloginsert.h"
#include "catalog/catalog.h"
#include "lib/ilist.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/slot.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/procarray.h"
#include "storage/smgr.h"
#include "utils/memutils.h"
#include "utils/rel.h"
Include dependency graph for rewriteheap.c:

Go to the source code of this file.

Data Structures

struct  RewriteStateData
 
struct  TidHashKey
 
struct  UnresolvedTupData
 
struct  OldToNewMappingData
 
struct  RewriteMappingFile
 
struct  RewriteMappingDataEntry
 

Typedefs

typedef struct RewriteStateData RewriteStateData
 
typedef UnresolvedTupDataUnresolvedTup
 
typedef OldToNewMappingDataOldToNewMapping
 
typedef struct RewriteMappingFile RewriteMappingFile
 
typedef struct RewriteMappingDataEntry RewriteMappingDataEntry
 

Functions

static void raw_heap_insert (RewriteState state, HeapTuple tup)
 
static void logical_begin_heap_rewrite (RewriteState state)
 
static void logical_rewrite_heap_tuple (RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
 
static void logical_end_heap_rewrite (RewriteState state)
 
RewriteState begin_heap_rewrite (Relation old_heap, Relation new_heap, TransactionId oldest_xmin, TransactionId freeze_xid, MultiXactId cutoff_multi)
 
void end_heap_rewrite (RewriteState state)
 
void rewrite_heap_tuple (RewriteState state, HeapTuple old_tuple, HeapTuple new_tuple)
 
bool rewrite_heap_dead_tuple (RewriteState state, HeapTuple old_tuple)
 
static void logical_heap_rewrite_flush_mappings (RewriteState state)
 
static void logical_rewrite_log_mapping (RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
 
void heap_xlog_logical_rewrite (XLogReaderState *r)
 
void CheckPointLogicalRewriteHeap (void)
 

Typedef Documentation

◆ OldToNewMapping

Definition at line 187 of file rewriteheap.c.

◆ RewriteMappingDataEntry

◆ RewriteMappingFile

◆ RewriteStateData

◆ UnresolvedTup

Definition at line 179 of file rewriteheap.c.

Function Documentation

◆ begin_heap_rewrite()

RewriteState begin_heap_rewrite ( Relation  old_heap,
Relation  new_heap,
TransactionId  oldest_xmin,
TransactionId  freeze_xid,
MultiXactId  cutoff_multi 
)

Definition at line 237 of file rewriteheap.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CurrentMemoryContext, HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, logical_begin_heap_rewrite(), MemoryContextSwitchTo(), palloc(), palloc0(), RelationGetNumberOfBlocks, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_cutoff_multi, RewriteStateData::rs_cxt, RewriteStateData::rs_freeze_xid, RewriteStateData::rs_new_rel, RewriteStateData::rs_old_new_tid_map, RewriteStateData::rs_old_rel, RewriteStateData::rs_oldest_xmin, and RewriteStateData::rs_unresolved_tups.

Referenced by heapam_relation_copy_for_cluster().

239 {
241  MemoryContext rw_cxt;
242  MemoryContext old_cxt;
243  HASHCTL hash_ctl;
244 
245  /*
246  * To ease cleanup, make a separate context that will contain the
247  * RewriteState struct itself plus all subsidiary data.
248  */
250  "Table rewrite",
252  old_cxt = MemoryContextSwitchTo(rw_cxt);
253 
254  /* Create and fill in the state struct */
255  state = palloc0(sizeof(RewriteStateData));
256 
257  state->rs_old_rel = old_heap;
258  state->rs_new_rel = new_heap;
259  state->rs_buffer = (Page) palloc(BLCKSZ);
260  /* new_heap needn't be empty, just locked */
261  state->rs_blockno = RelationGetNumberOfBlocks(new_heap);
262  state->rs_buffer_valid = false;
263  state->rs_oldest_xmin = oldest_xmin;
264  state->rs_freeze_xid = freeze_xid;
265  state->rs_cutoff_multi = cutoff_multi;
266  state->rs_cxt = rw_cxt;
267 
268  /* Initialize hash tables used to track update chains */
269  memset(&hash_ctl, 0, sizeof(hash_ctl));
270  hash_ctl.keysize = sizeof(TidHashKey);
271  hash_ctl.entrysize = sizeof(UnresolvedTupData);
272  hash_ctl.hcxt = state->rs_cxt;
273 
274  state->rs_unresolved_tups =
275  hash_create("Rewrite / Unresolved ctids",
276  128, /* arbitrary initial size */
277  &hash_ctl,
279 
280  hash_ctl.entrysize = sizeof(OldToNewMappingData);
281 
282  state->rs_old_new_tid_map =
283  hash_create("Rewrite / Old to new tid map",
284  128, /* arbitrary initial size */
285  &hash_ctl,
287 
288  MemoryContextSwitchTo(old_cxt);
289 
291 
292  return state;
293 }
#define AllocSetContextCreate
Definition: memutils.h:170
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
TransactionId rs_freeze_xid
Definition: rewriteheap.c:142
MultiXactId rs_cutoff_multi
Definition: rewriteheap.c:146
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Size entrysize
Definition: hsearch.h:73
Relation rs_new_rel
Definition: rewriteheap.c:135
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:151
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
TransactionId rs_oldest_xmin
Definition: rewriteheap.c:140
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext rs_cxt
Definition: rewriteheap.c:148
void * palloc0(Size size)
Definition: mcxt.c:980
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:318
Size keysize
Definition: hsearch.h:72
#define RelationGetNumberOfBlocks(reln)
Definition: bufmgr.h:211
static void logical_begin_heap_rewrite(RewriteState state)
Definition: rewriteheap.c:794
HTAB * rs_old_new_tid_map
Definition: rewriteheap.c:152
Definition: regguts.h:298
void * palloc(Size size)
Definition: mcxt.c:949
Relation rs_old_rel
Definition: rewriteheap.c:134
BlockNumber rs_blockno
Definition: rewriteheap.c:137
Pointer Page
Definition: bufpage.h:78

◆ CheckPointLogicalRewriteHeap()

void CheckPointLogicalRewriteHeap ( void  )

Definition at line 1195 of file rewriteheap.c.

References AllocateDir(), CloseTransientFile(), dirent::d_name, data_sync_elevel(), DEBUG1, elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FreeDir(), GetRedoRecPtr(), InvalidXLogRecPtr, LOGICAL_REWRITE_FORMAT, lstat, MAXPGPATH, OpenTransientFile(), PG_BINARY, pg_fsync(), pgstat_report_wait_end(), pgstat_report_wait_start(), ReadDir(), ReplicationSlotsComputeLogicalRestartLSN(), S_ISREG, snprintf, stat, and WAIT_EVENT_LOGICAL_REWRITE_CHECKPOINT_SYNC.

Referenced by CheckPointGuts().

1196 {
1197  XLogRecPtr cutoff;
1198  XLogRecPtr redo;
1199  DIR *mappings_dir;
1200  struct dirent *mapping_de;
1201  char path[MAXPGPATH + 20];
1202 
1203  /*
1204  * We start of with a minimum of the last redo pointer. No new decoding
1205  * slot will start before that, so that's a safe upper bound for removal.
1206  */
1207  redo = GetRedoRecPtr();
1208 
1209  /* now check for the restart ptrs from existing slots */
1211 
1212  /* don't start earlier than the restart lsn */
1213  if (cutoff != InvalidXLogRecPtr && redo < cutoff)
1214  cutoff = redo;
1215 
1216  mappings_dir = AllocateDir("pg_logical/mappings");
1217  while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
1218  {
1219  struct stat statbuf;
1220  Oid dboid;
1221  Oid relid;
1222  XLogRecPtr lsn;
1223  TransactionId rewrite_xid;
1224  TransactionId create_xid;
1225  uint32 hi,
1226  lo;
1227 
1228  if (strcmp(mapping_de->d_name, ".") == 0 ||
1229  strcmp(mapping_de->d_name, "..") == 0)
1230  continue;
1231 
1232  snprintf(path, sizeof(path), "pg_logical/mappings/%s", mapping_de->d_name);
1233  if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
1234  continue;
1235 
1236  /* Skip over files that cannot be ours. */
1237  if (strncmp(mapping_de->d_name, "map-", 4) != 0)
1238  continue;
1239 
1240  if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
1241  &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
1242  elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
1243 
1244  lsn = ((uint64) hi) << 32 | lo;
1245 
1246  if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
1247  {
1248  elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
1249  if (unlink(path) < 0)
1250  ereport(ERROR,
1252  errmsg("could not remove file \"%s\": %m", path)));
1253  }
1254  else
1255  {
1256  /* on some operating systems fsyncing a file requires O_RDWR */
1257  int fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
1258 
1259  /*
1260  * The file cannot vanish due to concurrency since this function
1261  * is the only one removing logical mappings and it's run while
1262  * CheckpointLock is held exclusively.
1263  */
1264  if (fd < 0)
1265  ereport(ERROR,
1267  errmsg("could not open file \"%s\": %m", path)));
1268 
1269  /*
1270  * We could try to avoid fsyncing files that either haven't
1271  * changed or have only been created since the checkpoint's start,
1272  * but it's currently not deemed worth the effort.
1273  */
1275  if (pg_fsync(fd) != 0)
1278  errmsg("could not fsync file \"%s\": %m", path)));
1280 
1281  if (CloseTransientFile(fd) != 0)
1282  ereport(ERROR,
1284  errmsg("could not close file \"%s\": %m", path)));
1285  }
1286  }
1287  FreeDir(mappings_dir);
1288 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define DEBUG1
Definition: elog.h:25
uint32 TransactionId
Definition: c.h:513
unsigned int Oid
Definition: postgres_ext.h:31
Definition: dirent.h:9
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1234
Definition: dirent.c:25
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2370
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:633
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition: slot.c:804
unsigned int uint32
Definition: c.h:367
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2581
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1380
#define S_ISREG(m)
Definition: win32_port.h:299
int CloseTransientFile(int fd)
Definition: fd.c:2547
#define stat(a, b)
Definition: win32_port.h:255
int data_sync_elevel(int elevel)
Definition: fd.c:3597
#define ereport(elevel,...)
Definition: elog.h:144
uint64 XLogRecPtr
Definition: xlogdefs.h:21
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2647
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1356
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:8361
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
#define lstat(path, sb)
Definition: win32_port.h:244
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define elog(elevel,...)
Definition: elog.h:214
int pg_fsync(int fd)
Definition: fd.c:343
char d_name[MAX_PATH]
Definition: dirent.h:14
#define snprintf
Definition: port.h:193
int FreeDir(DIR *dir)
Definition: fd.c:2699

◆ end_heap_rewrite()

void end_heap_rewrite ( RewriteState  state)

Definition at line 301 of file rewriteheap.c.

References hash_seq_init(), hash_seq_search(), ItemPointerSetInvalid, log_newpage(), logical_end_heap_rewrite(), MAIN_FORKNUM, MemoryContextDelete(), PageSetChecksumInplace(), raw_heap_insert(), RelationData::rd_node, RelationData::rd_smgr, RelationNeedsWAL, RelationOpenSmgr, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_cxt, RewriteStateData::rs_new_rel, RewriteStateData::rs_unresolved_tups, smgrextend(), smgrimmedsync(), HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, and UnresolvedTupData::tuple.

Referenced by heapam_relation_copy_for_cluster().

302 {
303  HASH_SEQ_STATUS seq_status;
304  UnresolvedTup unresolved;
305 
306  /*
307  * Write any remaining tuples in the UnresolvedTups table. If we have any
308  * left, they should in fact be dead, but let's err on the safe side.
309  */
310  hash_seq_init(&seq_status, state->rs_unresolved_tups);
311 
312  while ((unresolved = hash_seq_search(&seq_status)) != NULL)
313  {
314  ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
315  raw_heap_insert(state, unresolved->tuple);
316  }
317 
318  /* Write the last page, if any */
319  if (state->rs_buffer_valid)
320  {
321  if (RelationNeedsWAL(state->rs_new_rel))
322  log_newpage(&state->rs_new_rel->rd_node,
323  MAIN_FORKNUM,
324  state->rs_blockno,
325  state->rs_buffer,
326  true);
328 
330 
332  (char *) state->rs_buffer, true);
333  }
334 
335  /*
336  * When we WAL-logged rel pages, we must nonetheless fsync them. The
337  * reason is the same as in storage.c's RelationCopyStorage(): we're
338  * writing data that's not in shared buffers, and so a CHECKPOINT
339  * occurring during the rewriteheap operation won't have fsync'd data we
340  * wrote before the checkpoint.
341  */
342  if (RelationNeedsWAL(state->rs_new_rel))
344 
346 
347  /* Deleting the context frees everything */
348  MemoryContextDelete(state->rs_cxt);
349 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
static void logical_end_heap_rewrite(RewriteState state)
Definition: rewriteheap.c:941
struct SMgrRelationData * rd_smgr
Definition: rel.h:57
Relation rs_new_rel
Definition: rewriteheap.c:135
HeapTupleHeader t_data
Definition: htup.h:68
#define RelationOpenSmgr(relation)
Definition: rel.h:513
ItemPointerData t_ctid
Definition: htup_details.h:160
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:151
MemoryContext rs_cxt
Definition: rewriteheap.c:148
RelFileNode rd_node
Definition: rel.h:55
void PageSetChecksumInplace(Page page, BlockNumber blkno)
Definition: bufpage.c:1194
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1391
#define RelationNeedsWAL(relation)
Definition: rel.h:562
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1381
void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync)
Definition: smgr.c:462
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:172
XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, Page page, bool page_std)
Definition: xloginsert.c:977
static void raw_heap_insert(RewriteState state, HeapTuple tup)
Definition: rewriteheap.c:615
BlockNumber rs_blockno
Definition: rewriteheap.c:137
void smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
Definition: smgr.c:620

◆ heap_xlog_logical_rewrite()

void heap_xlog_logical_rewrite ( XLogReaderState r)

Definition at line 1112 of file rewriteheap.c.

References CloseTransientFile(), data_sync_elevel(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ftruncate, LOGICAL_REWRITE_FORMAT, xl_heap_rewrite_mapping::mapped_db, xl_heap_rewrite_mapping::mapped_rel, xl_heap_rewrite_mapping::mapped_xid, MAXPGPATH, xl_heap_rewrite_mapping::num_mappings, xl_heap_rewrite_mapping::offset, OpenTransientFile(), PG_BINARY, pg_fsync(), pg_pwrite(), pgstat_report_wait_end(), pgstat_report_wait_start(), snprintf, xl_heap_rewrite_mapping::start_lsn, WAIT_EVENT_LOGICAL_REWRITE_MAPPING_SYNC, WAIT_EVENT_LOGICAL_REWRITE_MAPPING_WRITE, WAIT_EVENT_LOGICAL_REWRITE_TRUNCATE, XLogRecGetData, and XLogRecGetXid.

Referenced by heap2_redo().

1113 {
1114  char path[MAXPGPATH];
1115  int fd;
1116  xl_heap_rewrite_mapping *xlrec;
1117  uint32 len;
1118  char *data;
1119 
1120  xlrec = (xl_heap_rewrite_mapping *) XLogRecGetData(r);
1121 
1122  snprintf(path, MAXPGPATH,
1123  "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
1124  xlrec->mapped_db, xlrec->mapped_rel,
1125  (uint32) (xlrec->start_lsn >> 32),
1126  (uint32) xlrec->start_lsn,
1127  xlrec->mapped_xid, XLogRecGetXid(r));
1128 
1129  fd = OpenTransientFile(path,
1130  O_CREAT | O_WRONLY | PG_BINARY);
1131  if (fd < 0)
1132  ereport(ERROR,
1134  errmsg("could not create file \"%s\": %m", path)));
1135 
1136  /*
1137  * Truncate all data that's not guaranteed to have been safely fsynced (by
1138  * previous record or by the last checkpoint).
1139  */
1141  if (ftruncate(fd, xlrec->offset) != 0)
1142  ereport(ERROR,
1144  errmsg("could not truncate file \"%s\" to %u: %m",
1145  path, (uint32) xlrec->offset)));
1147 
1148  data = XLogRecGetData(r) + sizeof(*xlrec);
1149 
1150  len = xlrec->num_mappings * sizeof(LogicalRewriteMappingData);
1151 
1152  /* write out tail end of mapping file (again) */
1153  errno = 0;
1155  if (pg_pwrite(fd, data, len, xlrec->offset) != len)
1156  {
1157  /* if write didn't set errno, assume problem is no disk space */
1158  if (errno == 0)
1159  errno = ENOSPC;
1160  ereport(ERROR,
1162  errmsg("could not write to file \"%s\": %m", path)));
1163  }
1165 
1166  /*
1167  * Now fsync all previously written data. We could improve things and only
1168  * do this for the last write to a file, but the required bookkeeping
1169  * doesn't seem worth the trouble.
1170  */
1172  if (pg_fsync(fd) != 0)
1175  errmsg("could not fsync file \"%s\": %m", path)));
1177 
1178  if (CloseTransientFile(fd) != 0)
1179  ereport(ERROR,
1181  errmsg("could not close file \"%s\": %m", path)));
1182 }
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1234
ssize_t pg_pwrite(int fd, const void *buf, size_t nbyte, off_t offset)
Definition: pwrite.c:27
#define XLogRecGetData(decoder)
Definition: xlogreader.h:311
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2370
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:633
unsigned int uint32
Definition: c.h:367
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1380
int CloseTransientFile(int fd)
Definition: fd.c:2547
#define XLogRecGetXid(decoder)
Definition: xlogreader.h:309
int data_sync_elevel(int elevel)
Definition: fd.c:3597
#define ereport(elevel,...)
Definition: elog.h:144
TransactionId mapped_xid
Definition: heapam_xlog.h:378
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1356
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
int errmsg(const char *fmt,...)
Definition: elog.c:824
struct LogicalRewriteMappingData LogicalRewriteMappingData
int pg_fsync(int fd)
Definition: fd.c:343
#define snprintf
Definition: port.h:193
#define ftruncate(a, b)
Definition: win32_port.h:59

◆ logical_begin_heap_rewrite()

static void logical_begin_heap_rewrite ( RewriteState  state)
static

Definition at line 794 of file rewriteheap.c.

References HASHCTL::entrysize, GetXLogInsertRecPtr(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, HASHCTL::keysize, ProcArrayGetReplicationSlotXmin(), RelationIsAccessibleInLogicalDecoding, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_cxt, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_logical_xmin, RewriteStateData::rs_num_rewrite_mappings, and RewriteStateData::rs_old_rel.

Referenced by begin_heap_rewrite().

795 {
796  HASHCTL hash_ctl;
797  TransactionId logical_xmin;
798 
799  /*
800  * We only need to persist these mappings if the rewritten table can be
801  * accessed during logical decoding, if not, we can skip doing any
802  * additional work.
803  */
804  state->rs_logical_rewrite =
806 
807  if (!state->rs_logical_rewrite)
808  return;
809 
810  ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin);
811 
812  /*
813  * If there are no logical slots in progress we don't need to do anything,
814  * there cannot be any remappings for relevant rows yet. The relation's
815  * lock protects us against races.
816  */
817  if (logical_xmin == InvalidTransactionId)
818  {
819  state->rs_logical_rewrite = false;
820  return;
821  }
822 
823  state->rs_logical_xmin = logical_xmin;
825  state->rs_num_rewrite_mappings = 0;
826 
827  memset(&hash_ctl, 0, sizeof(hash_ctl));
828  hash_ctl.keysize = sizeof(TransactionId);
829  hash_ctl.entrysize = sizeof(RewriteMappingFile);
830  hash_ctl.hcxt = state->rs_cxt;
831 
832  state->rs_logical_mappings =
833  hash_create("Logical rewrite mapping",
834  128, /* arbitrary initial size */
835  &hash_ctl,
837 }
TransactionId rs_logical_xmin
Definition: rewriteheap.c:144
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
uint32 TransactionId
Definition: c.h:513
MemoryContext hcxt
Definition: hsearch.h:78
Size entrysize
Definition: hsearch.h:73
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:3114
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:154
XLogRecPtr GetXLogInsertRecPtr(void)
Definition: xlog.c:11482
#define InvalidTransactionId
Definition: transam.h:31
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:150
HTAB * rs_logical_mappings
Definition: rewriteheap.c:153
#define HASH_BLOBS
Definition: hsearch.h:88
MemoryContext rs_cxt
Definition: rewriteheap.c:148
#define RelationIsAccessibleInLogicalDecoding(relation)
Definition: rel.h:619
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:318
Size keysize
Definition: hsearch.h:72
struct RewriteMappingFile RewriteMappingFile
Relation rs_old_rel
Definition: rewriteheap.c:134

◆ logical_end_heap_rewrite()

static void logical_end_heap_rewrite ( RewriteState  state)
static

Definition at line 941 of file rewriteheap.c.

References data_sync_elevel(), ereport, errcode_for_file_access(), errmsg(), ERROR, FileClose(), FileSync(), hash_seq_init(), hash_seq_search(), logical_heap_rewrite_flush_mappings(), RewriteMappingFile::path, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_num_rewrite_mappings, RewriteMappingFile::vfd, and WAIT_EVENT_LOGICAL_REWRITE_SYNC.

Referenced by end_heap_rewrite().

942 {
943  HASH_SEQ_STATUS seq_status;
944  RewriteMappingFile *src;
945 
946  /* done, no logical rewrite in progress */
947  if (!state->rs_logical_rewrite)
948  return;
949 
950  /* writeout remaining in-memory entries */
951  if (state->rs_num_rewrite_mappings > 0)
953 
954  /* Iterate over all mappings we have written and fsync the files. */
955  hash_seq_init(&seq_status, state->rs_logical_mappings);
956  while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
957  {
961  errmsg("could not fsync file \"%s\": %m", src->path)));
962  FileClose(src->vfd);
963  }
964  /* memory context cleanup will deal with the rest */
965 }
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:154
#define ERROR
Definition: elog.h:43
char path[MAXPGPATH]
Definition: rewriteheap.c:200
int FileSync(File file, uint32 wait_event_info)
Definition: fd.c:2125
int errcode_for_file_access(void)
Definition: elog.c:633
HTAB * rs_logical_mappings
Definition: rewriteheap.c:153
int data_sync_elevel(int elevel)
Definition: fd.c:3597
#define ereport(elevel,...)
Definition: elog.h:144
void FileClose(File file)
Definition: fd.c:1824
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1391
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1381
static void logical_heap_rewrite_flush_mappings(RewriteState state)
Definition: rewriteheap.c:843
int errmsg(const char *fmt,...)
Definition: elog.c:824

◆ logical_heap_rewrite_flush_mappings()

static void logical_heap_rewrite_flush_mappings ( RewriteState  state)
static

Definition at line 843 of file rewriteheap.c.

References Assert, dlist_mutable_iter::cur, DEBUG1, dlist_container, dlist_delete(), dlist_foreach_modify, elog, ereport, errcode_for_file_access(), errmsg(), ERROR, FileWrite(), hash_seq_init(), hash_seq_search(), InvalidOid, RewriteMappingDataEntry::map, xl_heap_rewrite_mapping::mapped_db, xl_heap_rewrite_mapping::mapped_rel, xl_heap_rewrite_mapping::mapped_xid, RewriteMappingFile::mappings, MyDatabaseId, RewriteMappingDataEntry::node, RewriteMappingFile::num_mappings, xl_heap_rewrite_mapping::num_mappings, RewriteMappingFile::off, xl_heap_rewrite_mapping::offset, palloc(), RewriteMappingFile::path, pfree(), RelationData::rd_rel, RelationGetRelid, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_num_rewrite_mappings, RewriteStateData::rs_old_rel, xl_heap_rewrite_mapping::start_lsn, RewriteMappingFile::vfd, WAIT_EVENT_LOGICAL_REWRITE_WRITE, RewriteMappingFile::xid, XLOG_HEAP2_REWRITE, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by logical_end_heap_rewrite(), and logical_rewrite_log_mapping().

844 {
845  HASH_SEQ_STATUS seq_status;
846  RewriteMappingFile *src;
847  dlist_mutable_iter iter;
848 
849  Assert(state->rs_logical_rewrite);
850 
851  /* no logical rewrite in progress, no need to iterate over mappings */
852  if (state->rs_num_rewrite_mappings == 0)
853  return;
854 
855  elog(DEBUG1, "flushing %u logical rewrite mapping entries",
856  state->rs_num_rewrite_mappings);
857 
858  hash_seq_init(&seq_status, state->rs_logical_mappings);
859  while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
860  {
861  char *waldata;
862  char *waldata_start;
864  Oid dboid;
865  uint32 len;
866  int written;
867 
868  /* this file hasn't got any new mappings */
869  if (src->num_mappings == 0)
870  continue;
871 
872  if (state->rs_old_rel->rd_rel->relisshared)
873  dboid = InvalidOid;
874  else
875  dboid = MyDatabaseId;
876 
877  xlrec.num_mappings = src->num_mappings;
878  xlrec.mapped_rel = RelationGetRelid(state->rs_old_rel);
879  xlrec.mapped_xid = src->xid;
880  xlrec.mapped_db = dboid;
881  xlrec.offset = src->off;
882  xlrec.start_lsn = state->rs_begin_lsn;
883 
884  /* write all mappings consecutively */
885  len = src->num_mappings * sizeof(LogicalRewriteMappingData);
886  waldata_start = waldata = palloc(len);
887 
888  /*
889  * collect data we need to write out, but don't modify ondisk data yet
890  */
891  dlist_foreach_modify(iter, &src->mappings)
892  {
894 
895  pmap = dlist_container(RewriteMappingDataEntry, node, iter.cur);
896 
897  memcpy(waldata, &pmap->map, sizeof(pmap->map));
898  waldata += sizeof(pmap->map);
899 
900  /* remove from the list and free */
901  dlist_delete(&pmap->node);
902  pfree(pmap);
903 
904  /* update bookkeeping */
905  state->rs_num_rewrite_mappings--;
906  src->num_mappings--;
907  }
908 
909  Assert(src->num_mappings == 0);
910  Assert(waldata == waldata_start + len);
911 
912  /*
913  * Note that we deviate from the usual WAL coding practices here,
914  * check the above "Logical rewrite support" comment for reasoning.
915  */
916  written = FileWrite(src->vfd, waldata_start, len, src->off,
918  if (written != len)
919  ereport(ERROR,
921  errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path,
922  written, len)));
923  src->off += len;
924 
925  XLogBeginInsert();
926  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
927  XLogRegisterData(waldata_start, len);
928 
929  /* write xlog record */
930  XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_REWRITE);
931 
932  pfree(waldata_start);
933  }
934  Assert(state->rs_num_rewrite_mappings == 0);
935 }
#define DEBUG1
Definition: elog.h:25
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define XLOG_HEAP2_REWRITE
Definition: heapam_xlog.h:53
Form_pg_class rd_rel
Definition: rel.h:109
unsigned int Oid
Definition: postgres_ext.h:31
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:1056
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:154
#define ERROR
Definition: elog.h:43
char path[MAXPGPATH]
Definition: rewriteheap.c:200
TransactionId xid
Definition: rewriteheap.c:195
int errcode_for_file_access(void)
Definition: elog.c:633
unsigned int uint32
Definition: c.h:367
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:150
HTAB * rs_logical_mappings
Definition: rewriteheap.c:153
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:324
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:416
int FileWrite(File file, char *buffer, int amount, off_t offset, uint32 wait_event_info)
Definition: fd.c:2027
Oid MyDatabaseId
Definition: globals.c:85
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:144
TransactionId mapped_xid
Definition: heapam_xlog.h:378
#define Assert(condition)
Definition: c.h:738
dlist_head mappings
Definition: rewriteheap.c:199
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1391
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1381
void * palloc(Size size)
Definition: mcxt.c:949
int errmsg(const char *fmt,...)
Definition: elog.c:824
LogicalRewriteMappingData map
Definition: rewriteheap.c:209
#define elog(elevel,...)
Definition: elog.h:214
struct LogicalRewriteMappingData LogicalRewriteMappingData
Relation rs_old_rel
Definition: rewriteheap.c:134
void XLogBeginInsert(void)
Definition: xloginsert.c:121
#define RelationGetRelid(relation)
Definition: rel.h:456

◆ logical_rewrite_heap_tuple()

static void logical_rewrite_heap_tuple ( RewriteState  state,
ItemPointerData  old_tid,
HeapTuple  new_tuple 
)
static

Definition at line 1038 of file rewriteheap.c.

References HEAP_XMAX_IS_LOCKED_ONLY, HeapTupleHeaderGetUpdateXid, HeapTupleHeaderGetXmin, logical_rewrite_log_mapping(), LogicalRewriteMappingData::new_node, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_node, LogicalRewriteMappingData::old_tid, RelationData::rd_node, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_logical_xmin, RewriteStateData::rs_new_rel, RewriteStateData::rs_old_rel, HeapTupleData::t_data, HeapTupleHeaderData::t_infomask, HeapTupleData::t_self, TransactionIdEquals, TransactionIdIsNormal, and TransactionIdPrecedes().

Referenced by rewrite_heap_tuple().

1040 {
1041  ItemPointerData new_tid = new_tuple->t_self;
1042  TransactionId cutoff = state->rs_logical_xmin;
1043  TransactionId xmin;
1044  TransactionId xmax;
1045  bool do_log_xmin = false;
1046  bool do_log_xmax = false;
1048 
1049  /* no logical rewrite in progress, we don't need to log anything */
1050  if (!state->rs_logical_rewrite)
1051  return;
1052 
1053  xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
1054  /* use *GetUpdateXid to correctly deal with multixacts */
1055  xmax = HeapTupleHeaderGetUpdateXid(new_tuple->t_data);
1056 
1057  /*
1058  * Log the mapping iff the tuple has been created recently.
1059  */
1060  if (TransactionIdIsNormal(xmin) && !TransactionIdPrecedes(xmin, cutoff))
1061  do_log_xmin = true;
1062 
1063  if (!TransactionIdIsNormal(xmax))
1064  {
1065  /*
1066  * no xmax is set, can't have any permanent ones, so this check is
1067  * sufficient
1068  */
1069  }
1070  else if (HEAP_XMAX_IS_LOCKED_ONLY(new_tuple->t_data->t_infomask))
1071  {
1072  /* only locked, we don't care */
1073  }
1074  else if (!TransactionIdPrecedes(xmax, cutoff))
1075  {
1076  /* tuple has been deleted recently, log */
1077  do_log_xmax = true;
1078  }
1079 
1080  /* if neither needs to be logged, we're done */
1081  if (!do_log_xmin && !do_log_xmax)
1082  return;
1083 
1084  /* fill out mapping information */
1085  map.old_node = state->rs_old_rel->rd_node;
1086  map.old_tid = old_tid;
1087  map.new_node = state->rs_new_rel->rd_node;
1088  map.new_tid = new_tid;
1089 
1090  /* ---
1091  * Now persist the mapping for the individual xids that are affected. We
1092  * need to log for both xmin and xmax if they aren't the same transaction
1093  * since the mapping files are per "affected" xid.
1094  * We don't muster all that much effort detecting whether xmin and xmax
1095  * are actually the same transaction, we just check whether the xid is the
1096  * same disregarding subtransactions. Logging too much is relatively
1097  * harmless and we could never do the check fully since subtransaction
1098  * data is thrown away during restarts.
1099  * ---
1100  */
1101  if (do_log_xmin)
1102  logical_rewrite_log_mapping(state, xmin, &map);
1103  /* separately log mapping for xmax unless it'd be redundant */
1104  if (do_log_xmax && !TransactionIdEquals(xmin, xmax))
1105  logical_rewrite_log_mapping(state, xmax, &map);
1106 }
#define HeapTupleHeaderGetUpdateXid(tup)
Definition: htup_details.h:365
TransactionId rs_logical_xmin
Definition: rewriteheap.c:144
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
uint32 TransactionId
Definition: c.h:513
Relation rs_new_rel
Definition: rewriteheap.c:135
HeapTupleHeader t_data
Definition: htup.h:68
ItemPointerData t_self
Definition: htup.h:65
static void logical_rewrite_log_mapping(RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
Definition: rewriteheap.c:971
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define HEAP_XMAX_IS_LOCKED_ONLY(infomask)
Definition: htup_details.h:230
RelFileNode rd_node
Definition: rel.h:55
ItemPointerData new_tid
Definition: rewriteheap.h:40
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:313
Relation rs_old_rel
Definition: rewriteheap.c:134
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
ItemPointerData old_tid
Definition: rewriteheap.h:39

◆ logical_rewrite_log_mapping()

static void logical_rewrite_log_mapping ( RewriteState  state,
TransactionId  xid,
LogicalRewriteMappingData map 
)
static

Definition at line 971 of file rewriteheap.c.

References dlist_init(), dlist_push_tail(), ereport, errcode_for_file_access(), errmsg(), ERROR, GetCurrentTransactionId(), HASH_ENTER, hash_search(), InvalidOid, logical_heap_rewrite_flush_mappings(), LOGICAL_REWRITE_FORMAT, RewriteMappingDataEntry::map, RewriteMappingFile::mappings, MAXPGPATH, MemoryContextAlloc(), MyDatabaseId, RewriteMappingDataEntry::node, RewriteMappingFile::num_mappings, RewriteMappingFile::off, RewriteMappingFile::path, PathNameOpenFile(), PG_BINARY, RelationData::rd_rel, RelationGetRelid, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_cxt, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_num_rewrite_mappings, RewriteStateData::rs_old_rel, snprintf, and RewriteMappingFile::vfd.

Referenced by logical_rewrite_heap_tuple().

973 {
974  RewriteMappingFile *src;
976  Oid relid;
977  bool found;
978 
979  relid = RelationGetRelid(state->rs_old_rel);
980 
981  /* look for existing mappings for this 'mapped' xid */
982  src = hash_search(state->rs_logical_mappings, &xid,
983  HASH_ENTER, &found);
984 
985  /*
986  * We haven't yet had the need to map anything for this xid, create
987  * per-xid data structures.
988  */
989  if (!found)
990  {
991  char path[MAXPGPATH];
992  Oid dboid;
993 
994  if (state->rs_old_rel->rd_rel->relisshared)
995  dboid = InvalidOid;
996  else
997  dboid = MyDatabaseId;
998 
999  snprintf(path, MAXPGPATH,
1000  "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
1001  dboid, relid,
1002  (uint32) (state->rs_begin_lsn >> 32),
1003  (uint32) state->rs_begin_lsn,
1004  xid, GetCurrentTransactionId());
1005 
1006  dlist_init(&src->mappings);
1007  src->num_mappings = 0;
1008  src->off = 0;
1009  memcpy(src->path, path, sizeof(path));
1010  src->vfd = PathNameOpenFile(path,
1011  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1012  if (src->vfd < 0)
1013  ereport(ERROR,
1015  errmsg("could not create file \"%s\": %m", path)));
1016  }
1017 
1018  pmap = MemoryContextAlloc(state->rs_cxt,
1019  sizeof(RewriteMappingDataEntry));
1020  memcpy(&pmap->map, map, sizeof(LogicalRewriteMappingData));
1021  dlist_push_tail(&src->mappings, &pmap->node);
1022  src->num_mappings++;
1023  state->rs_num_rewrite_mappings++;
1024 
1025  /*
1026  * Write out buffer every time we've too many in-memory entries across all
1027  * mapping files.
1028  */
1029  if (state->rs_num_rewrite_mappings >= 1000 /* arbitrary number */ )
1031 }
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1434
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:908
Form_pg_class rd_rel
Definition: rel.h:109
unsigned int Oid
Definition: postgres_ext.h:31
#define PG_BINARY
Definition: c.h:1234
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:154
#define ERROR
Definition: elog.h:43
char path[MAXPGPATH]
Definition: rewriteheap.c:200
#define MAXPGPATH
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:423
int errcode_for_file_access(void)
Definition: elog.c:633
unsigned int uint32
Definition: c.h:367
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:150
HTAB * rs_logical_mappings
Definition: rewriteheap.c:153
MemoryContext rs_cxt
Definition: rewriteheap.c:148
Oid MyDatabaseId
Definition: globals.c:85
#define InvalidOid
Definition: postgres_ext.h:36
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
#define ereport(elevel,...)
Definition: elog.h:144
dlist_head mappings
Definition: rewriteheap.c:199
static void logical_heap_rewrite_flush_mappings(RewriteState state)
Definition: rewriteheap.c:843
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
int errmsg(const char *fmt,...)
Definition: elog.c:824
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
LogicalRewriteMappingData map
Definition: rewriteheap.c:209
Relation rs_old_rel
Definition: rewriteheap.c:134
#define snprintf
Definition: port.h:193
#define RelationGetRelid(relation)
Definition: rel.h:456

◆ raw_heap_insert()

static void raw_heap_insert ( RewriteState  state,
HeapTuple  tup 
)
static

Definition at line 615 of file rewriteheap.c.

References Assert, elog, ereport, errcode(), errmsg(), ERROR, HEAP_DEFAULT_FILLFACTOR, heap_freetuple(), HEAP_INSERT_NO_LOGICAL, HEAP_INSERT_SKIP_FSM, heap_toast_insert_or_update(), HeapTupleHasExternal, InvalidOffsetNumber, ItemPointerIsValid, ItemPointerSet, log_newpage(), MAIN_FORKNUM, MAXALIGN, MaxHeapTupleSize, PageAddItem, PageGetHeapFreeSpace(), PageGetItem, PageGetItemId, PageInit(), PageSetChecksumInplace(), RelationData::rd_node, RelationData::rd_rel, RelationData::rd_smgr, RelationGetTargetPageFreeSpace, RelationNeedsWAL, RelationOpenSmgr, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_new_rel, smgrextend(), HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, HeapTupleData::t_len, HeapTupleData::t_self, and TOAST_TUPLE_THRESHOLD.

Referenced by end_heap_rewrite(), and rewrite_heap_tuple().

616 {
617  Page page = state->rs_buffer;
618  Size pageFreeSpace,
619  saveFreeSpace;
620  Size len;
621  OffsetNumber newoff;
622  HeapTuple heaptup;
623 
624  /*
625  * If the new tuple is too big for storage or contains already toasted
626  * out-of-line attributes from some other relation, invoke the toaster.
627  *
628  * Note: below this point, heaptup is the data we actually intend to store
629  * into the relation; tup is the caller's original untoasted data.
630  */
631  if (state->rs_new_rel->rd_rel->relkind == RELKIND_TOASTVALUE)
632  {
633  /* toast table entries should never be recursively toasted */
635  heaptup = tup;
636  }
637  else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
638  {
640 
641  /*
642  * While rewriting the heap for VACUUM FULL / CLUSTER, make sure data
643  * for the TOAST table are not logically decoded. The main heap is
644  * WAL-logged as XLOG FPI records, which are not logically decoded.
645  */
646  options |= HEAP_INSERT_NO_LOGICAL;
647 
648  heaptup = heap_toast_insert_or_update(state->rs_new_rel, tup, NULL,
649  options);
650  }
651  else
652  heaptup = tup;
653 
654  len = MAXALIGN(heaptup->t_len); /* be conservative */
655 
656  /*
657  * If we're gonna fail for oversize tuple, do it right away
658  */
659  if (len > MaxHeapTupleSize)
660  ereport(ERROR,
661  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
662  errmsg("row is too big: size %zu, maximum size %zu",
663  len, MaxHeapTupleSize)));
664 
665  /* Compute desired extra freespace due to fillfactor option */
666  saveFreeSpace = RelationGetTargetPageFreeSpace(state->rs_new_rel,
668 
669  /* Now we can check to see if there's enough free space already. */
670  if (state->rs_buffer_valid)
671  {
672  pageFreeSpace = PageGetHeapFreeSpace(page);
673 
674  if (len + saveFreeSpace > pageFreeSpace)
675  {
676  /* Doesn't fit, so write out the existing page */
677 
678  /* XLOG stuff */
679  if (RelationNeedsWAL(state->rs_new_rel))
680  log_newpage(&state->rs_new_rel->rd_node,
681  MAIN_FORKNUM,
682  state->rs_blockno,
683  page,
684  true);
685 
686  /*
687  * Now write the page. We say skipFsync = true because there's no
688  * need for smgr to schedule an fsync for this write; we'll do it
689  * ourselves in end_heap_rewrite.
690  */
692 
693  PageSetChecksumInplace(page, state->rs_blockno);
694 
696  state->rs_blockno, (char *) page, true);
697 
698  state->rs_blockno++;
699  state->rs_buffer_valid = false;
700  }
701  }
702 
703  if (!state->rs_buffer_valid)
704  {
705  /* Initialize a new empty page */
706  PageInit(page, BLCKSZ, 0);
707  state->rs_buffer_valid = true;
708  }
709 
710  /* And now we can insert the tuple into the page */
711  newoff = PageAddItem(page, (Item) heaptup->t_data, heaptup->t_len,
712  InvalidOffsetNumber, false, true);
713  if (newoff == InvalidOffsetNumber)
714  elog(ERROR, "failed to add tuple");
715 
716  /* Update caller's t_self to the actual position where it was stored */
717  ItemPointerSet(&(tup->t_self), state->rs_blockno, newoff);
718 
719  /*
720  * Insert the correct position into CTID of the stored tuple, too, if the
721  * caller didn't supply a valid CTID.
722  */
723  if (!ItemPointerIsValid(&tup->t_data->t_ctid))
724  {
725  ItemId newitemid;
726  HeapTupleHeader onpage_tup;
727 
728  newitemid = PageGetItemId(page, newoff);
729  onpage_tup = (HeapTupleHeader) PageGetItem(page, newitemid);
730 
731  onpage_tup->t_ctid = tup->t_self;
732  }
733 
734  /* If heaptup is a private copy, release it. */
735  if (heaptup != tup)
736  heap_freetuple(heaptup);
737 }
#define ItemPointerIsValid(pointer)
Definition: itemptr.h:82
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
struct SMgrRelationData * rd_smgr
Definition: rel.h:57
Pointer Item
Definition: item.h:17
Relation rs_new_rel
Definition: rewriteheap.c:135
int errcode(int sqlerrcode)
Definition: elog.c:610
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:416
Form_pg_class rd_rel
Definition: rel.h:109
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
uint16 OffsetNumber
Definition: off.h:24
HeapTupleHeader t_data
Definition: htup.h:68
#define RelationOpenSmgr(relation)
Definition: rel.h:513
#define ERROR
Definition: elog.h:43
Size PageGetHeapFreeSpace(Page page)
Definition: bufpage.c:658
ItemPointerData t_ctid
Definition: htup_details.h:160
ItemPointerData t_self
Definition: htup.h:65
uint32 t_len
Definition: htup.h:64
#define MaxHeapTupleSize
Definition: htup_details.h:560
#define RelationGetTargetPageFreeSpace(relation, defaultff)
Definition: rel.h:340
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:235
#define TOAST_TUPLE_THRESHOLD
Definition: heaptoast.h:48
#define InvalidOffsetNumber
Definition: off.h:26
#define ereport(elevel,...)
Definition: elog.h:144
RelFileNode rd_node
Definition: rel.h:55
#define Assert(condition)
Definition: c.h:738
size_t Size
Definition: c.h:466
void PageSetChecksumInplace(Page page, BlockNumber blkno)
Definition: bufpage.c:1194
#define MAXALIGN(LEN)
Definition: c.h:691
#define RelationNeedsWAL(relation)
Definition: rel.h:562
void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync)
Definition: smgr.c:462
#define HEAP_INSERT_SKIP_FSM
Definition: heapam.h:34
#define HeapTupleHasExternal(tuple)
Definition: htup_details.h:673
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define HEAP_INSERT_NO_LOGICAL
Definition: heapam.h:36
XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, Page page, bool page_std)
Definition: xloginsert.c:977
#define elog(elevel,...)
Definition: elog.h:214
#define HEAP_DEFAULT_FILLFACTOR
Definition: rel.h:311
HeapTuple heap_toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, int options)
Definition: heaptoast.c:96
BlockNumber rs_blockno
Definition: rewriteheap.c:137
#define PageGetItem(page, itemId)
Definition: bufpage.h:340
Pointer Page
Definition: bufpage.h:78
#define ItemPointerSet(pointer, blockNumber, offNum)
Definition: itemptr.h:127
void PageInit(Page page, Size pageSize, Size specialSize)
Definition: bufpage.c:42

◆ rewrite_heap_dead_tuple()

bool rewrite_heap_dead_tuple ( RewriteState  state,
HeapTuple  old_tuple 
)

Definition at line 565 of file rewriteheap.c.

References Assert, HASH_FIND, HASH_REMOVE, hash_search(), heap_freetuple(), HeapTupleHeaderGetXmin, RewriteStateData::rs_unresolved_tups, HeapTupleData::t_data, HeapTupleData::t_self, TidHashKey::tid, UnresolvedTupData::tuple, and TidHashKey::xmin.

Referenced by heapam_relation_copy_for_cluster().

566 {
567  /*
568  * If we have already seen an earlier tuple in the update chain that
569  * points to this tuple, let's forget about that earlier tuple. It's in
570  * fact dead as well, our simple xmax < OldestXmin test in
571  * HeapTupleSatisfiesVacuum just wasn't enough to detect it. It happens
572  * when xmin of a tuple is greater than xmax, which sounds
573  * counter-intuitive but is perfectly valid.
574  *
575  * We don't bother to try to detect the situation the other way round,
576  * when we encounter the dead tuple first and then the recently dead one
577  * that points to it. If that happens, we'll have some unmatched entries
578  * in the UnresolvedTups hash table at the end. That can happen anyway,
579  * because a vacuum might have removed the dead tuple in the chain before
580  * us.
581  */
582  UnresolvedTup unresolved;
583  TidHashKey hashkey;
584  bool found;
585 
586  memset(&hashkey, 0, sizeof(hashkey));
587  hashkey.xmin = HeapTupleHeaderGetXmin(old_tuple->t_data);
588  hashkey.tid = old_tuple->t_self;
589 
590  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
591  HASH_FIND, NULL);
592 
593  if (unresolved != NULL)
594  {
595  /* Need to free the contained tuple as well as the hashtable entry */
596  heap_freetuple(unresolved->tuple);
597  hash_search(state->rs_unresolved_tups, &hashkey,
598  HASH_REMOVE, &found);
599  Assert(found);
600  return true;
601  }
602 
603  return false;
604 }
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:908
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
HeapTupleHeader t_data
Definition: htup.h:68
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:151
ItemPointerData t_self
Definition: htup.h:65
ItemPointerData tid
Definition: rewriteheap.c:166
#define Assert(condition)
Definition: c.h:738
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:313
TransactionId xmin
Definition: rewriteheap.c:165

◆ rewrite_heap_tuple()

void rewrite_heap_tuple ( RewriteState  state,
HeapTuple  old_tuple,
HeapTuple  new_tuple 
)

Definition at line 363 of file rewriteheap.c.

References Assert, HASH_ENTER, HASH_FIND, HASH_REMOVE, hash_search(), HEAP2_XACT_MASK, heap_copytuple(), heap_freetuple(), heap_freeze_tuple(), HEAP_UPDATED, HEAP_XACT_MASK, HEAP_XMAX_INVALID, HeapTupleHeaderGetUpdateXid, HeapTupleHeaderGetXmin, HeapTupleHeaderIndicatesMovedPartitions, HeapTupleHeaderIsOnlyLocked(), ItemPointerEquals(), ItemPointerSetInvalid, logical_rewrite_heap_tuple(), MemoryContextSwitchTo(), OldToNewMappingData::new_tid, UnresolvedTupData::old_tid, raw_heap_insert(), RelationData::rd_rel, RewriteStateData::rs_cutoff_multi, RewriteStateData::rs_cxt, RewriteStateData::rs_freeze_xid, RewriteStateData::rs_old_new_tid_map, RewriteStateData::rs_old_rel, RewriteStateData::rs_oldest_xmin, RewriteStateData::rs_unresolved_tups, HeapTupleHeaderData::t_choice, HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, HeapTupleHeaderData::t_heap, HeapTupleHeaderData::t_infomask, HeapTupleHeaderData::t_infomask2, HeapTupleData::t_self, TidHashKey::tid, TransactionIdPrecedes(), UnresolvedTupData::tuple, and TidHashKey::xmin.

Referenced by reform_and_rewrite_tuple().

365 {
366  MemoryContext old_cxt;
367  ItemPointerData old_tid;
368  TidHashKey hashkey;
369  bool found;
370  bool free_new;
371 
372  old_cxt = MemoryContextSwitchTo(state->rs_cxt);
373 
374  /*
375  * Copy the original tuple's visibility information into new_tuple.
376  *
377  * XXX we might later need to copy some t_infomask2 bits, too? Right now,
378  * we intentionally clear the HOT status bits.
379  */
380  memcpy(&new_tuple->t_data->t_choice.t_heap,
381  &old_tuple->t_data->t_choice.t_heap,
382  sizeof(HeapTupleFields));
383 
384  new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
385  new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
386  new_tuple->t_data->t_infomask |=
387  old_tuple->t_data->t_infomask & HEAP_XACT_MASK;
388 
389  /*
390  * While we have our hands on the tuple, we may as well freeze any
391  * eligible xmin or xmax, so that future VACUUM effort can be saved.
392  */
393  heap_freeze_tuple(new_tuple->t_data,
394  state->rs_old_rel->rd_rel->relfrozenxid,
395  state->rs_old_rel->rd_rel->relminmxid,
396  state->rs_freeze_xid,
397  state->rs_cutoff_multi);
398 
399  /*
400  * Invalid ctid means that ctid should point to the tuple itself. We'll
401  * override it later if the tuple is part of an update chain.
402  */
403  ItemPointerSetInvalid(&new_tuple->t_data->t_ctid);
404 
405  /*
406  * If the tuple has been updated, check the old-to-new mapping hash table.
407  */
408  if (!((old_tuple->t_data->t_infomask & HEAP_XMAX_INVALID) ||
409  HeapTupleHeaderIsOnlyLocked(old_tuple->t_data)) &&
411  !(ItemPointerEquals(&(old_tuple->t_self),
412  &(old_tuple->t_data->t_ctid))))
413  {
414  OldToNewMapping mapping;
415 
416  memset(&hashkey, 0, sizeof(hashkey));
417  hashkey.xmin = HeapTupleHeaderGetUpdateXid(old_tuple->t_data);
418  hashkey.tid = old_tuple->t_data->t_ctid;
419 
420  mapping = (OldToNewMapping)
421  hash_search(state->rs_old_new_tid_map, &hashkey,
422  HASH_FIND, NULL);
423 
424  if (mapping != NULL)
425  {
426  /*
427  * We've already copied the tuple that t_ctid points to, so we can
428  * set the ctid of this tuple to point to the new location, and
429  * insert it right away.
430  */
431  new_tuple->t_data->t_ctid = mapping->new_tid;
432 
433  /* We don't need the mapping entry anymore */
434  hash_search(state->rs_old_new_tid_map, &hashkey,
435  HASH_REMOVE, &found);
436  Assert(found);
437  }
438  else
439  {
440  /*
441  * We haven't seen the tuple t_ctid points to yet. Stash this
442  * tuple into unresolved_tups to be written later.
443  */
444  UnresolvedTup unresolved;
445 
446  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
447  HASH_ENTER, &found);
448  Assert(!found);
449 
450  unresolved->old_tid = old_tuple->t_self;
451  unresolved->tuple = heap_copytuple(new_tuple);
452 
453  /*
454  * We can't do anything more now, since we don't know where the
455  * tuple will be written.
456  */
457  MemoryContextSwitchTo(old_cxt);
458  return;
459  }
460  }
461 
462  /*
463  * Now we will write the tuple, and then check to see if it is the B tuple
464  * in any new or known pair. When we resolve a known pair, we will be
465  * able to write that pair's A tuple, and then we have to check if it
466  * resolves some other pair. Hence, we need a loop here.
467  */
468  old_tid = old_tuple->t_self;
469  free_new = false;
470 
471  for (;;)
472  {
473  ItemPointerData new_tid;
474 
475  /* Insert the tuple and find out where it's put in new_heap */
476  raw_heap_insert(state, new_tuple);
477  new_tid = new_tuple->t_self;
478 
479  logical_rewrite_heap_tuple(state, old_tid, new_tuple);
480 
481  /*
482  * If the tuple is the updated version of a row, and the prior version
483  * wouldn't be DEAD yet, then we need to either resolve the prior
484  * version (if it's waiting in rs_unresolved_tups), or make an entry
485  * in rs_old_new_tid_map (so we can resolve it when we do see it). The
486  * previous tuple's xmax would equal this one's xmin, so it's
487  * RECENTLY_DEAD if and only if the xmin is not before OldestXmin.
488  */
489  if ((new_tuple->t_data->t_infomask & HEAP_UPDATED) &&
491  state->rs_oldest_xmin))
492  {
493  /*
494  * Okay, this is B in an update pair. See if we've seen A.
495  */
496  UnresolvedTup unresolved;
497 
498  memset(&hashkey, 0, sizeof(hashkey));
499  hashkey.xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
500  hashkey.tid = old_tid;
501 
502  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
503  HASH_FIND, NULL);
504 
505  if (unresolved != NULL)
506  {
507  /*
508  * We have seen and memorized the previous tuple already. Now
509  * that we know where we inserted the tuple its t_ctid points
510  * to, fix its t_ctid and insert it to the new heap.
511  */
512  if (free_new)
513  heap_freetuple(new_tuple);
514  new_tuple = unresolved->tuple;
515  free_new = true;
516  old_tid = unresolved->old_tid;
517  new_tuple->t_data->t_ctid = new_tid;
518 
519  /*
520  * We don't need the hash entry anymore, but don't free its
521  * tuple just yet.
522  */
523  hash_search(state->rs_unresolved_tups, &hashkey,
524  HASH_REMOVE, &found);
525  Assert(found);
526 
527  /* loop back to insert the previous tuple in the chain */
528  continue;
529  }
530  else
531  {
532  /*
533  * Remember the new tid of this tuple. We'll use it to set the
534  * ctid when we find the previous tuple in the chain.
535  */
536  OldToNewMapping mapping;
537 
538  mapping = hash_search(state->rs_old_new_tid_map, &hashkey,
539  HASH_ENTER, &found);
540  Assert(!found);
541 
542  mapping->new_tid = new_tid;
543  }
544  }
545 
546  /* Done with this (chain of) tuples, for now */
547  if (free_new)
548  heap_freetuple(new_tuple);
549  break;
550  }
551 
552  MemoryContextSwitchTo(old_cxt);
553 }
#define HeapTupleHeaderGetUpdateXid(tup)
Definition: htup_details.h:365
HeapTuple heap_copytuple(HeapTuple tuple)
Definition: heaptuple.c:680
HeapTupleFields t_heap
Definition: htup_details.h:156
static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
Definition: rewriteheap.c:1038
TransactionId rs_freeze_xid
Definition: rewriteheap.c:142
bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple)
MultiXactId rs_cutoff_multi
Definition: rewriteheap.c:146
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define HEAP2_XACT_MASK
Definition: htup_details.h:283
#define HeapTupleHeaderIndicatesMovedPartitions(tup)
Definition: htup_details.h:445
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:908
#define HEAP_UPDATED
Definition: htup_details.h:209
ItemPointerData old_tid
Definition: rewriteheap.c:175
Form_pg_class rd_rel
Definition: rel.h:109
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
HeapTupleHeader t_data
Definition: htup.h:68
ItemPointerData new_tid
Definition: rewriteheap.c:184
#define HEAP_XMAX_INVALID
Definition: htup_details.h:207
ItemPointerData t_ctid
Definition: htup_details.h:160
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:151
ItemPointerData t_self
Definition: htup.h:65
bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId relfrozenxid, TransactionId relminmxid, TransactionId cutoff_xid, TransactionId cutoff_multi)
Definition: heapam.c:6347
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
TransactionId rs_oldest_xmin
Definition: rewriteheap.c:140
MemoryContext rs_cxt
Definition: rewriteheap.c:148
ItemPointerData tid
Definition: rewriteheap.c:166
#define Assert(condition)
Definition: c.h:738
HTAB * rs_old_new_tid_map
Definition: rewriteheap.c:152
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:313
union HeapTupleHeaderData::@44 t_choice
bool ItemPointerEquals(ItemPointer pointer1, ItemPointer pointer2)
Definition: itemptr.c:29
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:172
TransactionId xmin
Definition: rewriteheap.c:165
static void raw_heap_insert(RewriteState state, HeapTuple tup)
Definition: rewriteheap.c:615
#define HEAP_XACT_MASK
Definition: htup_details.h:218
Relation rs_old_rel
Definition: rewriteheap.c:134
OldToNewMappingData * OldToNewMapping
Definition: rewriteheap.c:187