PostgreSQL Source Code  git master
rewriteheap.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <unistd.h>
#include "access/heapam.h"
#include "access/heapam_xlog.h"
#include "access/heaptoast.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xloginsert.h"
#include "catalog/catalog.h"
#include "lib/ilist.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/slot.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/procarray.h"
#include "storage/smgr.h"
#include "utils/memutils.h"
#include "utils/rel.h"
Include dependency graph for rewriteheap.c:

Go to the source code of this file.

Data Structures

struct  RewriteStateData
 
struct  TidHashKey
 
struct  UnresolvedTupData
 
struct  OldToNewMappingData
 
struct  RewriteMappingFile
 
struct  RewriteMappingDataEntry
 

Typedefs

typedef struct RewriteStateData RewriteStateData
 
typedef UnresolvedTupDataUnresolvedTup
 
typedef OldToNewMappingDataOldToNewMapping
 
typedef struct RewriteMappingFile RewriteMappingFile
 
typedef struct RewriteMappingDataEntry RewriteMappingDataEntry
 

Functions

static void raw_heap_insert (RewriteState state, HeapTuple tup)
 
static void logical_begin_heap_rewrite (RewriteState state)
 
static void logical_rewrite_heap_tuple (RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
 
static void logical_end_heap_rewrite (RewriteState state)
 
RewriteState begin_heap_rewrite (Relation old_heap, Relation new_heap, TransactionId oldest_xmin, TransactionId freeze_xid, MultiXactId cutoff_multi)
 
void end_heap_rewrite (RewriteState state)
 
void rewrite_heap_tuple (RewriteState state, HeapTuple old_tuple, HeapTuple new_tuple)
 
bool rewrite_heap_dead_tuple (RewriteState state, HeapTuple old_tuple)
 
static void logical_heap_rewrite_flush_mappings (RewriteState state)
 
static void logical_rewrite_log_mapping (RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
 
void heap_xlog_logical_rewrite (XLogReaderState *r)
 
void CheckPointLogicalRewriteHeap (void)
 

Typedef Documentation

◆ OldToNewMapping

Definition at line 187 of file rewriteheap.c.

◆ RewriteMappingDataEntry

◆ RewriteMappingFile

◆ RewriteStateData

◆ UnresolvedTup

Definition at line 179 of file rewriteheap.c.

Function Documentation

◆ begin_heap_rewrite()

RewriteState begin_heap_rewrite ( Relation  old_heap,
Relation  new_heap,
TransactionId  oldest_xmin,
TransactionId  freeze_xid,
MultiXactId  cutoff_multi 
)

Definition at line 237 of file rewriteheap.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CurrentMemoryContext, HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, logical_begin_heap_rewrite(), MemoryContextSwitchTo(), palloc(), palloc0(), RelationGetNumberOfBlocks, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_cutoff_multi, RewriteStateData::rs_cxt, RewriteStateData::rs_freeze_xid, RewriteStateData::rs_new_rel, RewriteStateData::rs_old_new_tid_map, RewriteStateData::rs_old_rel, RewriteStateData::rs_oldest_xmin, and RewriteStateData::rs_unresolved_tups.

Referenced by heapam_relation_copy_for_cluster().

239 {
241  MemoryContext rw_cxt;
242  MemoryContext old_cxt;
243  HASHCTL hash_ctl;
244 
245  /*
246  * To ease cleanup, make a separate context that will contain the
247  * RewriteState struct itself plus all subsidiary data.
248  */
250  "Table rewrite",
252  old_cxt = MemoryContextSwitchTo(rw_cxt);
253 
254  /* Create and fill in the state struct */
255  state = palloc0(sizeof(RewriteStateData));
256 
257  state->rs_old_rel = old_heap;
258  state->rs_new_rel = new_heap;
259  state->rs_buffer = (Page) palloc(BLCKSZ);
260  /* new_heap needn't be empty, just locked */
261  state->rs_blockno = RelationGetNumberOfBlocks(new_heap);
262  state->rs_buffer_valid = false;
263  state->rs_oldest_xmin = oldest_xmin;
264  state->rs_freeze_xid = freeze_xid;
265  state->rs_cutoff_multi = cutoff_multi;
266  state->rs_cxt = rw_cxt;
267 
268  /* Initialize hash tables used to track update chains */
269  hash_ctl.keysize = sizeof(TidHashKey);
270  hash_ctl.entrysize = sizeof(UnresolvedTupData);
271  hash_ctl.hcxt = state->rs_cxt;
272 
273  state->rs_unresolved_tups =
274  hash_create("Rewrite / Unresolved ctids",
275  128, /* arbitrary initial size */
276  &hash_ctl,
278 
279  hash_ctl.entrysize = sizeof(OldToNewMappingData);
280 
281  state->rs_old_new_tid_map =
282  hash_create("Rewrite / Old to new tid map",
283  128, /* arbitrary initial size */
284  &hash_ctl,
286 
287  MemoryContextSwitchTo(old_cxt);
288 
290 
291  return state;
292 }
#define AllocSetContextCreate
Definition: memutils.h:173
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
MemoryContext hcxt
Definition: hsearch.h:86
TransactionId rs_freeze_xid
Definition: rewriteheap.c:142
MultiXactId rs_cutoff_multi
Definition: rewriteheap.c:146
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Size entrysize
Definition: hsearch.h:76
Relation rs_new_rel
Definition: rewriteheap.c:135
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:151
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:195
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
TransactionId rs_oldest_xmin
Definition: rewriteheap.c:140
#define HASH_BLOBS
Definition: hsearch.h:97
MemoryContext rs_cxt
Definition: rewriteheap.c:148
void * palloc0(Size size)
Definition: mcxt.c:1093
Size keysize
Definition: hsearch.h:75
#define RelationGetNumberOfBlocks(reln)
Definition: bufmgr.h:212
static void logical_begin_heap_rewrite(RewriteState state)
Definition: rewriteheap.c:794
HTAB * rs_old_new_tid_map
Definition: rewriteheap.c:152
Definition: regguts.h:317
void * palloc(Size size)
Definition: mcxt.c:1062
Relation rs_old_rel
Definition: rewriteheap.c:134
BlockNumber rs_blockno
Definition: rewriteheap.c:137
Pointer Page
Definition: bufpage.h:78

◆ CheckPointLogicalRewriteHeap()

void CheckPointLogicalRewriteHeap ( void  )

Definition at line 1192 of file rewriteheap.c.

References AllocateDir(), CloseTransientFile(), dirent::d_name, data_sync_elevel(), DEBUG1, elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FreeDir(), GetRedoRecPtr(), InvalidXLogRecPtr, LOGICAL_REWRITE_FORMAT, lstat, MAXPGPATH, OpenTransientFile(), PG_BINARY, pg_fsync(), pgstat_report_wait_end(), pgstat_report_wait_start(), ReadDir(), ReplicationSlotsComputeLogicalRestartLSN(), S_ISREG, snprintf, stat::st_mode, and WAIT_EVENT_LOGICAL_REWRITE_CHECKPOINT_SYNC.

Referenced by CheckPointGuts().

1193 {
1194  XLogRecPtr cutoff;
1195  XLogRecPtr redo;
1196  DIR *mappings_dir;
1197  struct dirent *mapping_de;
1198  char path[MAXPGPATH + 20];
1199 
1200  /*
1201  * We start of with a minimum of the last redo pointer. No new decoding
1202  * slot will start before that, so that's a safe upper bound for removal.
1203  */
1204  redo = GetRedoRecPtr();
1205 
1206  /* now check for the restart ptrs from existing slots */
1208 
1209  /* don't start earlier than the restart lsn */
1210  if (cutoff != InvalidXLogRecPtr && redo < cutoff)
1211  cutoff = redo;
1212 
1213  mappings_dir = AllocateDir("pg_logical/mappings");
1214  while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
1215  {
1216  struct stat statbuf;
1217  Oid dboid;
1218  Oid relid;
1219  XLogRecPtr lsn;
1220  TransactionId rewrite_xid;
1221  TransactionId create_xid;
1222  uint32 hi,
1223  lo;
1224 
1225  if (strcmp(mapping_de->d_name, ".") == 0 ||
1226  strcmp(mapping_de->d_name, "..") == 0)
1227  continue;
1228 
1229  snprintf(path, sizeof(path), "pg_logical/mappings/%s", mapping_de->d_name);
1230  if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
1231  continue;
1232 
1233  /* Skip over files that cannot be ours. */
1234  if (strncmp(mapping_de->d_name, "map-", 4) != 0)
1235  continue;
1236 
1237  if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
1238  &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
1239  elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
1240 
1241  lsn = ((uint64) hi) << 32 | lo;
1242 
1243  if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
1244  {
1245  elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
1246  if (unlink(path) < 0)
1247  ereport(ERROR,
1249  errmsg("could not remove file \"%s\": %m", path)));
1250  }
1251  else
1252  {
1253  /* on some operating systems fsyncing a file requires O_RDWR */
1254  int fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
1255 
1256  /*
1257  * The file cannot vanish due to concurrency since this function
1258  * is the only one removing logical mappings and only one
1259  * checkpoint can be in progress at a time.
1260  */
1261  if (fd < 0)
1262  ereport(ERROR,
1264  errmsg("could not open file \"%s\": %m", path)));
1265 
1266  /*
1267  * We could try to avoid fsyncing files that either haven't
1268  * changed or have only been created since the checkpoint's start,
1269  * but it's currently not deemed worth the effort.
1270  */
1272  if (pg_fsync(fd) != 0)
1275  errmsg("could not fsync file \"%s\": %m", path)));
1277 
1278  if (CloseTransientFile(fd) != 0)
1279  ereport(ERROR,
1281  errmsg("could not close file \"%s\": %m", path)));
1282  }
1283  }
1284  FreeDir(mappings_dir);
1285 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define DEBUG1
Definition: elog.h:25
static void pgstat_report_wait_end(void)
Definition: wait_event.h:278
uint32 TransactionId
Definition: c.h:587
unsigned int Oid
Definition: postgres_ext.h:31
Definition: dirent.h:9
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1271
Definition: dirent.c:25
#define ERROR
Definition: elog.h:46
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2509
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:721
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition: slot.c:860
unsigned int uint32
Definition: c.h:441
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2720
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:262
#define S_ISREG(m)
Definition: win32_port.h:327
int CloseTransientFile(int fd)
Definition: fd.c:2686
int data_sync_elevel(int elevel)
Definition: fd.c:3805
#define ereport(elevel,...)
Definition: elog.h:157
uint64 XLogRecPtr
Definition: xlogdefs.h:21
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2786
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:8620
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
#define lstat(path, sb)
Definition: win32_port.h:284
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define elog(elevel,...)
Definition: elog.h:232
int pg_fsync(int fd)
Definition: fd.c:357
char d_name[MAX_PATH]
Definition: dirent.h:15
#define snprintf
Definition: port.h:217
int FreeDir(DIR *dir)
Definition: fd.c:2838

◆ end_heap_rewrite()

void end_heap_rewrite ( RewriteState  state)

Definition at line 300 of file rewriteheap.c.

References hash_seq_init(), hash_seq_search(), ItemPointerSetInvalid, log_newpage(), logical_end_heap_rewrite(), MAIN_FORKNUM, MemoryContextDelete(), PageSetChecksumInplace(), raw_heap_insert(), RelationData::rd_node, RelationGetSmgr(), RelationNeedsWAL, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_cxt, RewriteStateData::rs_new_rel, RewriteStateData::rs_unresolved_tups, smgrextend(), smgrimmedsync(), HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, and UnresolvedTupData::tuple.

Referenced by heapam_relation_copy_for_cluster().

301 {
302  HASH_SEQ_STATUS seq_status;
303  UnresolvedTup unresolved;
304 
305  /*
306  * Write any remaining tuples in the UnresolvedTups table. If we have any
307  * left, they should in fact be dead, but let's err on the safe side.
308  */
309  hash_seq_init(&seq_status, state->rs_unresolved_tups);
310 
311  while ((unresolved = hash_seq_search(&seq_status)) != NULL)
312  {
313  ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
314  raw_heap_insert(state, unresolved->tuple);
315  }
316 
317  /* Write the last page, if any */
318  if (state->rs_buffer_valid)
319  {
320  if (RelationNeedsWAL(state->rs_new_rel))
321  log_newpage(&state->rs_new_rel->rd_node,
322  MAIN_FORKNUM,
323  state->rs_blockno,
324  state->rs_buffer,
325  true);
326 
328 
330  state->rs_blockno, (char *) state->rs_buffer, true);
331  }
332 
333  /*
334  * When we WAL-logged rel pages, we must nonetheless fsync them. The
335  * reason is the same as in storage.c's RelationCopyStorage(): we're
336  * writing data that's not in shared buffers, and so a CHECKPOINT
337  * occurring during the rewriteheap operation won't have fsync'd data we
338  * wrote before the checkpoint.
339  */
340  if (RelationNeedsWAL(state->rs_new_rel))
342 
344 
345  /* Deleting the context frees everything */
346  MemoryContextDelete(state->rs_cxt);
347 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:218
static void logical_end_heap_rewrite(RewriteState state)
Definition: rewriteheap.c:940
Relation rs_new_rel
Definition: rewriteheap.c:135
HeapTupleHeader t_data
Definition: htup.h:68
ItemPointerData t_ctid
Definition: htup_details.h:160
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:151
MemoryContext rs_cxt
Definition: rewriteheap.c:148
RelFileNode rd_node
Definition: rel.h:56
static SMgrRelation RelationGetSmgr(Relation rel)
Definition: rel.h:544
void PageSetChecksumInplace(Page page, BlockNumber blkno)
Definition: bufpage.c:1532
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
#define RelationNeedsWAL(relation)
Definition: rel.h:601
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync)
Definition: smgr.c:462
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:172
XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, Page page, bool page_std)
Definition: xloginsert.c:1050
static void raw_heap_insert(RewriteState state, HeapTuple tup)
Definition: rewriteheap.c:613
BlockNumber rs_blockno
Definition: rewriteheap.c:137
void smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
Definition: smgr.c:660

◆ heap_xlog_logical_rewrite()

void heap_xlog_logical_rewrite ( XLogReaderState r)

Definition at line 1110 of file rewriteheap.c.

References CloseTransientFile(), data_sync_elevel(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ftruncate, LOGICAL_REWRITE_FORMAT, LSN_FORMAT_ARGS, xl_heap_rewrite_mapping::mapped_db, xl_heap_rewrite_mapping::mapped_rel, xl_heap_rewrite_mapping::mapped_xid, MAXPGPATH, xl_heap_rewrite_mapping::num_mappings, xl_heap_rewrite_mapping::offset, OpenTransientFile(), PG_BINARY, pg_fsync(), pg_pwrite(), pgstat_report_wait_end(), pgstat_report_wait_start(), snprintf, xl_heap_rewrite_mapping::start_lsn, WAIT_EVENT_LOGICAL_REWRITE_MAPPING_SYNC, WAIT_EVENT_LOGICAL_REWRITE_MAPPING_WRITE, WAIT_EVENT_LOGICAL_REWRITE_TRUNCATE, XLogRecGetData, and XLogRecGetXid.

Referenced by heap2_redo().

1111 {
1112  char path[MAXPGPATH];
1113  int fd;
1114  xl_heap_rewrite_mapping *xlrec;
1115  uint32 len;
1116  char *data;
1117 
1118  xlrec = (xl_heap_rewrite_mapping *) XLogRecGetData(r);
1119 
1120  snprintf(path, MAXPGPATH,
1121  "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
1122  xlrec->mapped_db, xlrec->mapped_rel,
1123  LSN_FORMAT_ARGS(xlrec->start_lsn),
1124  xlrec->mapped_xid, XLogRecGetXid(r));
1125 
1126  fd = OpenTransientFile(path,
1127  O_CREAT | O_WRONLY | PG_BINARY);
1128  if (fd < 0)
1129  ereport(ERROR,
1131  errmsg("could not create file \"%s\": %m", path)));
1132 
1133  /*
1134  * Truncate all data that's not guaranteed to have been safely fsynced (by
1135  * previous record or by the last checkpoint).
1136  */
1138  if (ftruncate(fd, xlrec->offset) != 0)
1139  ereport(ERROR,
1141  errmsg("could not truncate file \"%s\" to %u: %m",
1142  path, (uint32) xlrec->offset)));
1144 
1145  data = XLogRecGetData(r) + sizeof(*xlrec);
1146 
1147  len = xlrec->num_mappings * sizeof(LogicalRewriteMappingData);
1148 
1149  /* write out tail end of mapping file (again) */
1150  errno = 0;
1152  if (pg_pwrite(fd, data, len, xlrec->offset) != len)
1153  {
1154  /* if write didn't set errno, assume problem is no disk space */
1155  if (errno == 0)
1156  errno = ENOSPC;
1157  ereport(ERROR,
1159  errmsg("could not write to file \"%s\": %m", path)));
1160  }
1162 
1163  /*
1164  * Now fsync all previously written data. We could improve things and only
1165  * do this for the last write to a file, but the required bookkeeping
1166  * doesn't seem worth the trouble.
1167  */
1169  if (pg_fsync(fd) != 0)
1172  errmsg("could not fsync file \"%s\": %m", path)));
1174 
1175  if (CloseTransientFile(fd) != 0)
1176  ereport(ERROR,
1178  errmsg("could not close file \"%s\": %m", path)));
1179 }
static void pgstat_report_wait_end(void)
Definition: wait_event.h:278
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1271
ssize_t pg_pwrite(int fd, const void *buf, size_t nbyte, off_t offset)
Definition: pwrite.c:27
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecGetData(decoder)
Definition: xlogreader.h:320
#define ERROR
Definition: elog.h:46
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2509
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:721
unsigned int uint32
Definition: c.h:441
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:262
int CloseTransientFile(int fd)
Definition: fd.c:2686
#define XLogRecGetXid(decoder)
Definition: xlogreader.h:317
int data_sync_elevel(int elevel)
Definition: fd.c:3805
#define ereport(elevel,...)
Definition: elog.h:157
TransactionId mapped_xid
Definition: heapam_xlog.h:384
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
int errmsg(const char *fmt,...)
Definition: elog.c:909
struct LogicalRewriteMappingData LogicalRewriteMappingData
int pg_fsync(int fd)
Definition: fd.c:357
#define snprintf
Definition: port.h:217
#define ftruncate(a, b)
Definition: win32_port.h:73

◆ logical_begin_heap_rewrite()

static void logical_begin_heap_rewrite ( RewriteState  state)
static

Definition at line 794 of file rewriteheap.c.

References HASHCTL::entrysize, GetXLogInsertRecPtr(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, HASHCTL::keysize, ProcArrayGetReplicationSlotXmin(), RelationIsAccessibleInLogicalDecoding, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_cxt, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_logical_xmin, RewriteStateData::rs_num_rewrite_mappings, and RewriteStateData::rs_old_rel.

Referenced by begin_heap_rewrite().

795 {
796  HASHCTL hash_ctl;
797  TransactionId logical_xmin;
798 
799  /*
800  * We only need to persist these mappings if the rewritten table can be
801  * accessed during logical decoding, if not, we can skip doing any
802  * additional work.
803  */
804  state->rs_logical_rewrite =
806 
807  if (!state->rs_logical_rewrite)
808  return;
809 
810  ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin);
811 
812  /*
813  * If there are no logical slots in progress we don't need to do anything,
814  * there cannot be any remappings for relevant rows yet. The relation's
815  * lock protects us against races.
816  */
817  if (logical_xmin == InvalidTransactionId)
818  {
819  state->rs_logical_rewrite = false;
820  return;
821  }
822 
823  state->rs_logical_xmin = logical_xmin;
825  state->rs_num_rewrite_mappings = 0;
826 
827  hash_ctl.keysize = sizeof(TransactionId);
828  hash_ctl.entrysize = sizeof(RewriteMappingFile);
829  hash_ctl.hcxt = state->rs_cxt;
830 
831  state->rs_logical_mappings =
832  hash_create("Logical rewrite mapping",
833  128, /* arbitrary initial size */
834  &hash_ctl,
836 }
TransactionId rs_logical_xmin
Definition: rewriteheap.c:144
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
uint32 TransactionId
Definition: c.h:587
MemoryContext hcxt
Definition: hsearch.h:86
Size entrysize
Definition: hsearch.h:76
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:3890
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:154
XLogRecPtr GetXLogInsertRecPtr(void)
Definition: xlog.c:11946
#define InvalidTransactionId
Definition: transam.h:31
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:150
HTAB * rs_logical_mappings
Definition: rewriteheap.c:153
#define HASH_BLOBS
Definition: hsearch.h:97
MemoryContext rs_cxt
Definition: rewriteheap.c:148
#define RelationIsAccessibleInLogicalDecoding(relation)
Definition: rel.h:657
Size keysize
Definition: hsearch.h:75
struct RewriteMappingFile RewriteMappingFile
Relation rs_old_rel
Definition: rewriteheap.c:134

◆ logical_end_heap_rewrite()

static void logical_end_heap_rewrite ( RewriteState  state)
static

Definition at line 940 of file rewriteheap.c.

References data_sync_elevel(), ereport, errcode_for_file_access(), errmsg(), ERROR, FileClose(), FileSync(), hash_seq_init(), hash_seq_search(), logical_heap_rewrite_flush_mappings(), RewriteMappingFile::path, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_num_rewrite_mappings, RewriteMappingFile::vfd, and WAIT_EVENT_LOGICAL_REWRITE_SYNC.

Referenced by end_heap_rewrite().

941 {
942  HASH_SEQ_STATUS seq_status;
943  RewriteMappingFile *src;
944 
945  /* done, no logical rewrite in progress */
946  if (!state->rs_logical_rewrite)
947  return;
948 
949  /* writeout remaining in-memory entries */
950  if (state->rs_num_rewrite_mappings > 0)
952 
953  /* Iterate over all mappings we have written and fsync the files. */
954  hash_seq_init(&seq_status, state->rs_logical_mappings);
955  while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
956  {
960  errmsg("could not fsync file \"%s\": %m", src->path)));
961  FileClose(src->vfd);
962  }
963  /* memory context cleanup will deal with the rest */
964 }
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:154
#define ERROR
Definition: elog.h:46
char path[MAXPGPATH]
Definition: rewriteheap.c:200
int FileSync(File file, uint32 wait_event_info)
Definition: fd.c:2264
int errcode_for_file_access(void)
Definition: elog.c:721
HTAB * rs_logical_mappings
Definition: rewriteheap.c:153
int data_sync_elevel(int elevel)
Definition: fd.c:3805
#define ereport(elevel,...)
Definition: elog.h:157
void FileClose(File file)
Definition: fd.c:1959
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
static void logical_heap_rewrite_flush_mappings(RewriteState state)
Definition: rewriteheap.c:842
int errmsg(const char *fmt,...)
Definition: elog.c:909

◆ logical_heap_rewrite_flush_mappings()

static void logical_heap_rewrite_flush_mappings ( RewriteState  state)
static

Definition at line 842 of file rewriteheap.c.

References Assert, dlist_mutable_iter::cur, DEBUG1, dlist_container, dlist_delete(), dlist_foreach_modify, elog, ereport, errcode_for_file_access(), errmsg(), ERROR, FileWrite(), hash_seq_init(), hash_seq_search(), InvalidOid, RewriteMappingDataEntry::map, xl_heap_rewrite_mapping::mapped_db, xl_heap_rewrite_mapping::mapped_rel, xl_heap_rewrite_mapping::mapped_xid, RewriteMappingFile::mappings, MyDatabaseId, RewriteMappingDataEntry::node, RewriteMappingFile::num_mappings, xl_heap_rewrite_mapping::num_mappings, RewriteMappingFile::off, xl_heap_rewrite_mapping::offset, palloc(), RewriteMappingFile::path, pfree(), RelationData::rd_rel, RelationGetRelid, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_num_rewrite_mappings, RewriteStateData::rs_old_rel, xl_heap_rewrite_mapping::start_lsn, RewriteMappingFile::vfd, WAIT_EVENT_LOGICAL_REWRITE_WRITE, RewriteMappingFile::xid, XLOG_HEAP2_REWRITE, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by logical_end_heap_rewrite(), and logical_rewrite_log_mapping().

843 {
844  HASH_SEQ_STATUS seq_status;
845  RewriteMappingFile *src;
846  dlist_mutable_iter iter;
847 
848  Assert(state->rs_logical_rewrite);
849 
850  /* no logical rewrite in progress, no need to iterate over mappings */
851  if (state->rs_num_rewrite_mappings == 0)
852  return;
853 
854  elog(DEBUG1, "flushing %u logical rewrite mapping entries",
855  state->rs_num_rewrite_mappings);
856 
857  hash_seq_init(&seq_status, state->rs_logical_mappings);
858  while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
859  {
860  char *waldata;
861  char *waldata_start;
863  Oid dboid;
864  uint32 len;
865  int written;
866 
867  /* this file hasn't got any new mappings */
868  if (src->num_mappings == 0)
869  continue;
870 
871  if (state->rs_old_rel->rd_rel->relisshared)
872  dboid = InvalidOid;
873  else
874  dboid = MyDatabaseId;
875 
876  xlrec.num_mappings = src->num_mappings;
877  xlrec.mapped_rel = RelationGetRelid(state->rs_old_rel);
878  xlrec.mapped_xid = src->xid;
879  xlrec.mapped_db = dboid;
880  xlrec.offset = src->off;
881  xlrec.start_lsn = state->rs_begin_lsn;
882 
883  /* write all mappings consecutively */
884  len = src->num_mappings * sizeof(LogicalRewriteMappingData);
885  waldata_start = waldata = palloc(len);
886 
887  /*
888  * collect data we need to write out, but don't modify ondisk data yet
889  */
890  dlist_foreach_modify(iter, &src->mappings)
891  {
893 
894  pmap = dlist_container(RewriteMappingDataEntry, node, iter.cur);
895 
896  memcpy(waldata, &pmap->map, sizeof(pmap->map));
897  waldata += sizeof(pmap->map);
898 
899  /* remove from the list and free */
900  dlist_delete(&pmap->node);
901  pfree(pmap);
902 
903  /* update bookkeeping */
904  state->rs_num_rewrite_mappings--;
905  src->num_mappings--;
906  }
907 
908  Assert(src->num_mappings == 0);
909  Assert(waldata == waldata_start + len);
910 
911  /*
912  * Note that we deviate from the usual WAL coding practices here,
913  * check the above "Logical rewrite support" comment for reasoning.
914  */
915  written = FileWrite(src->vfd, waldata_start, len, src->off,
917  if (written != len)
918  ereport(ERROR,
920  errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path,
921  written, len)));
922  src->off += len;
923 
924  XLogBeginInsert();
925  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
926  XLogRegisterData(waldata_start, len);
927 
928  /* write xlog record */
929  XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_REWRITE);
930 
931  pfree(waldata_start);
932  }
933  Assert(state->rs_num_rewrite_mappings == 0);
934 }
#define DEBUG1
Definition: elog.h:25
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:543
#define XLOG_HEAP2_REWRITE
Definition: heapam_xlog.h:53
Form_pg_class rd_rel
Definition: rel.h:109
unsigned int Oid
Definition: postgres_ext.h:31
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
void pfree(void *pointer)
Definition: mcxt.c:1169
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:154
#define ERROR
Definition: elog.h:46
char path[MAXPGPATH]
Definition: rewriteheap.c:200
TransactionId xid
Definition: rewriteheap.c:195
int errcode_for_file_access(void)
Definition: elog.c:721
unsigned int uint32
Definition: c.h:441
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:150
HTAB * rs_logical_mappings
Definition: rewriteheap.c:153
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:340
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:434
int FileWrite(File file, char *buffer, int amount, off_t offset, uint32 wait_event_info)
Definition: fd.c:2166
Oid MyDatabaseId
Definition: globals.c:88
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:157
TransactionId mapped_xid
Definition: heapam_xlog.h:384
#define Assert(condition)
Definition: c.h:804
dlist_head mappings
Definition: rewriteheap.c:199
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
void * palloc(Size size)
Definition: mcxt.c:1062
int errmsg(const char *fmt,...)
Definition: elog.c:909
LogicalRewriteMappingData map
Definition: rewriteheap.c:209
#define elog(elevel,...)
Definition: elog.h:232
struct LogicalRewriteMappingData LogicalRewriteMappingData
Relation rs_old_rel
Definition: rewriteheap.c:134
void XLogBeginInsert(void)
Definition: xloginsert.c:135
#define RelationGetRelid(relation)
Definition: rel.h:477

◆ logical_rewrite_heap_tuple()

static void logical_rewrite_heap_tuple ( RewriteState  state,
ItemPointerData  old_tid,
HeapTuple  new_tuple 
)
static

Definition at line 1036 of file rewriteheap.c.

References HEAP_XMAX_IS_LOCKED_ONLY, HeapTupleHeaderGetUpdateXid, HeapTupleHeaderGetXmin, logical_rewrite_log_mapping(), LogicalRewriteMappingData::new_node, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_node, LogicalRewriteMappingData::old_tid, RelationData::rd_node, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_logical_xmin, RewriteStateData::rs_new_rel, RewriteStateData::rs_old_rel, HeapTupleData::t_data, HeapTupleHeaderData::t_infomask, HeapTupleData::t_self, TransactionIdEquals, TransactionIdIsNormal, and TransactionIdPrecedes().

Referenced by rewrite_heap_tuple().

1038 {
1039  ItemPointerData new_tid = new_tuple->t_self;
1040  TransactionId cutoff = state->rs_logical_xmin;
1041  TransactionId xmin;
1042  TransactionId xmax;
1043  bool do_log_xmin = false;
1044  bool do_log_xmax = false;
1046 
1047  /* no logical rewrite in progress, we don't need to log anything */
1048  if (!state->rs_logical_rewrite)
1049  return;
1050 
1051  xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
1052  /* use *GetUpdateXid to correctly deal with multixacts */
1053  xmax = HeapTupleHeaderGetUpdateXid(new_tuple->t_data);
1054 
1055  /*
1056  * Log the mapping iff the tuple has been created recently.
1057  */
1058  if (TransactionIdIsNormal(xmin) && !TransactionIdPrecedes(xmin, cutoff))
1059  do_log_xmin = true;
1060 
1061  if (!TransactionIdIsNormal(xmax))
1062  {
1063  /*
1064  * no xmax is set, can't have any permanent ones, so this check is
1065  * sufficient
1066  */
1067  }
1068  else if (HEAP_XMAX_IS_LOCKED_ONLY(new_tuple->t_data->t_infomask))
1069  {
1070  /* only locked, we don't care */
1071  }
1072  else if (!TransactionIdPrecedes(xmax, cutoff))
1073  {
1074  /* tuple has been deleted recently, log */
1075  do_log_xmax = true;
1076  }
1077 
1078  /* if neither needs to be logged, we're done */
1079  if (!do_log_xmin && !do_log_xmax)
1080  return;
1081 
1082  /* fill out mapping information */
1083  map.old_node = state->rs_old_rel->rd_node;
1084  map.old_tid = old_tid;
1085  map.new_node = state->rs_new_rel->rd_node;
1086  map.new_tid = new_tid;
1087 
1088  /* ---
1089  * Now persist the mapping for the individual xids that are affected. We
1090  * need to log for both xmin and xmax if they aren't the same transaction
1091  * since the mapping files are per "affected" xid.
1092  * We don't muster all that much effort detecting whether xmin and xmax
1093  * are actually the same transaction, we just check whether the xid is the
1094  * same disregarding subtransactions. Logging too much is relatively
1095  * harmless and we could never do the check fully since subtransaction
1096  * data is thrown away during restarts.
1097  * ---
1098  */
1099  if (do_log_xmin)
1100  logical_rewrite_log_mapping(state, xmin, &map);
1101  /* separately log mapping for xmax unless it'd be redundant */
1102  if (do_log_xmax && !TransactionIdEquals(xmin, xmax))
1103  logical_rewrite_log_mapping(state, xmax, &map);
1104 }
#define HeapTupleHeaderGetUpdateXid(tup)
Definition: htup_details.h:365
TransactionId rs_logical_xmin
Definition: rewriteheap.c:144
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
uint32 TransactionId
Definition: c.h:587
Relation rs_new_rel
Definition: rewriteheap.c:135
HeapTupleHeader t_data
Definition: htup.h:68
ItemPointerData t_self
Definition: htup.h:65
static void logical_rewrite_log_mapping(RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
Definition: rewriteheap.c:970
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define HEAP_XMAX_IS_LOCKED_ONLY(infomask)
Definition: htup_details.h:230
RelFileNode rd_node
Definition: rel.h:56
ItemPointerData new_tid
Definition: rewriteheap.h:40
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:313
Relation rs_old_rel
Definition: rewriteheap.c:134
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
ItemPointerData old_tid
Definition: rewriteheap.h:39

◆ logical_rewrite_log_mapping()

static void logical_rewrite_log_mapping ( RewriteState  state,
TransactionId  xid,
LogicalRewriteMappingData map 
)
static

Definition at line 970 of file rewriteheap.c.

References dlist_init(), dlist_push_tail(), ereport, errcode_for_file_access(), errmsg(), ERROR, GetCurrentTransactionId(), HASH_ENTER, hash_search(), InvalidOid, logical_heap_rewrite_flush_mappings(), LOGICAL_REWRITE_FORMAT, LSN_FORMAT_ARGS, RewriteMappingDataEntry::map, RewriteMappingFile::mappings, MAXPGPATH, MemoryContextAlloc(), MyDatabaseId, RewriteMappingDataEntry::node, RewriteMappingFile::num_mappings, RewriteMappingFile::off, RewriteMappingFile::path, PathNameOpenFile(), PG_BINARY, RelationData::rd_rel, RelationGetRelid, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_cxt, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_num_rewrite_mappings, RewriteStateData::rs_old_rel, snprintf, and RewriteMappingFile::vfd.

Referenced by logical_rewrite_heap_tuple().

972 {
973  RewriteMappingFile *src;
975  Oid relid;
976  bool found;
977 
978  relid = RelationGetRelid(state->rs_old_rel);
979 
980  /* look for existing mappings for this 'mapped' xid */
981  src = hash_search(state->rs_logical_mappings, &xid,
982  HASH_ENTER, &found);
983 
984  /*
985  * We haven't yet had the need to map anything for this xid, create
986  * per-xid data structures.
987  */
988  if (!found)
989  {
990  char path[MAXPGPATH];
991  Oid dboid;
992 
993  if (state->rs_old_rel->rd_rel->relisshared)
994  dboid = InvalidOid;
995  else
996  dboid = MyDatabaseId;
997 
998  snprintf(path, MAXPGPATH,
999  "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
1000  dboid, relid,
1001  LSN_FORMAT_ARGS(state->rs_begin_lsn),
1002  xid, GetCurrentTransactionId());
1003 
1004  dlist_init(&src->mappings);
1005  src->num_mappings = 0;
1006  src->off = 0;
1007  memcpy(src->path, path, sizeof(path));
1008  src->vfd = PathNameOpenFile(path,
1009  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1010  if (src->vfd < 0)
1011  ereport(ERROR,
1013  errmsg("could not create file \"%s\": %m", path)));
1014  }
1015 
1016  pmap = MemoryContextAlloc(state->rs_cxt,
1017  sizeof(RewriteMappingDataEntry));
1018  memcpy(&pmap->map, map, sizeof(LogicalRewriteMappingData));
1019  dlist_push_tail(&src->mappings, &pmap->node);
1020  src->num_mappings++;
1021  state->rs_num_rewrite_mappings++;
1022 
1023  /*
1024  * Write out buffer every time we've too many in-memory entries across all
1025  * mapping files.
1026  */
1027  if (state->rs_num_rewrite_mappings >= 1000 /* arbitrary number */ )
1029 }
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1564
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
Form_pg_class rd_rel
Definition: rel.h:109
unsigned int Oid
Definition: postgres_ext.h:31
#define PG_BINARY
Definition: c.h:1271
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:154
#define ERROR
Definition: elog.h:46
char path[MAXPGPATH]
Definition: rewriteheap.c:200
#define MAXPGPATH
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:438
int errcode_for_file_access(void)
Definition: elog.c:721
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:150
HTAB * rs_logical_mappings
Definition: rewriteheap.c:153
MemoryContext rs_cxt
Definition: rewriteheap.c:148
Oid MyDatabaseId
Definition: globals.c:88
#define InvalidOid
Definition: postgres_ext.h:36
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
#define ereport(elevel,...)
Definition: elog.h:157
dlist_head mappings
Definition: rewriteheap.c:199
static void logical_heap_rewrite_flush_mappings(RewriteState state)
Definition: rewriteheap.c:842
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
int errmsg(const char *fmt,...)
Definition: elog.c:909
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:863
LogicalRewriteMappingData map
Definition: rewriteheap.c:209
Relation rs_old_rel
Definition: rewriteheap.c:134
#define snprintf
Definition: port.h:217
#define RelationGetRelid(relation)
Definition: rel.h:477

◆ raw_heap_insert()

static void raw_heap_insert ( RewriteState  state,
HeapTuple  tup 
)
static

Definition at line 613 of file rewriteheap.c.

References Assert, elog, ereport, errcode(), errmsg(), ERROR, HEAP_DEFAULT_FILLFACTOR, heap_freetuple(), HEAP_INSERT_NO_LOGICAL, HEAP_INSERT_SKIP_FSM, heap_toast_insert_or_update(), HeapTupleHasExternal, InvalidOffsetNumber, ItemPointerIsValid, ItemPointerSet, log_newpage(), MAIN_FORKNUM, MAXALIGN, MaxHeapTupleSize, PageAddItem, PageGetHeapFreeSpace(), PageGetItem, PageGetItemId, PageInit(), PageSetChecksumInplace(), RelationData::rd_node, RelationData::rd_rel, RelationGetSmgr(), RelationGetTargetPageFreeSpace, RelationNeedsWAL, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_new_rel, smgrextend(), HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, HeapTupleData::t_len, HeapTupleData::t_self, and TOAST_TUPLE_THRESHOLD.

Referenced by end_heap_rewrite(), and rewrite_heap_tuple().

614 {
615  Page page = state->rs_buffer;
616  Size pageFreeSpace,
617  saveFreeSpace;
618  Size len;
619  OffsetNumber newoff;
620  HeapTuple heaptup;
621 
622  /*
623  * If the new tuple is too big for storage or contains already toasted
624  * out-of-line attributes from some other relation, invoke the toaster.
625  *
626  * Note: below this point, heaptup is the data we actually intend to store
627  * into the relation; tup is the caller's original untoasted data.
628  */
629  if (state->rs_new_rel->rd_rel->relkind == RELKIND_TOASTVALUE)
630  {
631  /* toast table entries should never be recursively toasted */
633  heaptup = tup;
634  }
635  else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
636  {
638 
639  /*
640  * While rewriting the heap for VACUUM FULL / CLUSTER, make sure data
641  * for the TOAST table are not logically decoded. The main heap is
642  * WAL-logged as XLOG FPI records, which are not logically decoded.
643  */
644  options |= HEAP_INSERT_NO_LOGICAL;
645 
646  heaptup = heap_toast_insert_or_update(state->rs_new_rel, tup, NULL,
647  options);
648  }
649  else
650  heaptup = tup;
651 
652  len = MAXALIGN(heaptup->t_len); /* be conservative */
653 
654  /*
655  * If we're gonna fail for oversize tuple, do it right away
656  */
657  if (len > MaxHeapTupleSize)
658  ereport(ERROR,
659  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
660  errmsg("row is too big: size %zu, maximum size %zu",
661  len, MaxHeapTupleSize)));
662 
663  /* Compute desired extra freespace due to fillfactor option */
664  saveFreeSpace = RelationGetTargetPageFreeSpace(state->rs_new_rel,
666 
667  /* Now we can check to see if there's enough free space already. */
668  if (state->rs_buffer_valid)
669  {
670  pageFreeSpace = PageGetHeapFreeSpace(page);
671 
672  if (len + saveFreeSpace > pageFreeSpace)
673  {
674  /*
675  * Doesn't fit, so write out the existing page. It always
676  * contains a tuple. Hence, unlike RelationGetBufferForTuple(),
677  * enforce saveFreeSpace unconditionally.
678  */
679 
680  /* XLOG stuff */
681  if (RelationNeedsWAL(state->rs_new_rel))
682  log_newpage(&state->rs_new_rel->rd_node,
683  MAIN_FORKNUM,
684  state->rs_blockno,
685  page,
686  true);
687 
688  /*
689  * Now write the page. We say skipFsync = true because there's no
690  * need for smgr to schedule an fsync for this write; we'll do it
691  * ourselves in end_heap_rewrite.
692  */
693  PageSetChecksumInplace(page, state->rs_blockno);
694 
696  state->rs_blockno, (char *) page, true);
697 
698  state->rs_blockno++;
699  state->rs_buffer_valid = false;
700  }
701  }
702 
703  if (!state->rs_buffer_valid)
704  {
705  /* Initialize a new empty page */
706  PageInit(page, BLCKSZ, 0);
707  state->rs_buffer_valid = true;
708  }
709 
710  /* And now we can insert the tuple into the page */
711  newoff = PageAddItem(page, (Item) heaptup->t_data, heaptup->t_len,
712  InvalidOffsetNumber, false, true);
713  if (newoff == InvalidOffsetNumber)
714  elog(ERROR, "failed to add tuple");
715 
716  /* Update caller's t_self to the actual position where it was stored */
717  ItemPointerSet(&(tup->t_self), state->rs_blockno, newoff);
718 
719  /*
720  * Insert the correct position into CTID of the stored tuple, too, if the
721  * caller didn't supply a valid CTID.
722  */
723  if (!ItemPointerIsValid(&tup->t_data->t_ctid))
724  {
725  ItemId newitemid;
726  HeapTupleHeader onpage_tup;
727 
728  newitemid = PageGetItemId(page, newoff);
729  onpage_tup = (HeapTupleHeader) PageGetItem(page, newitemid);
730 
731  onpage_tup->t_ctid = tup->t_self;
732  }
733 
734  /* If heaptup is a private copy, release it. */
735  if (heaptup != tup)
736  heap_freetuple(heaptup);
737 }
#define ItemPointerIsValid(pointer)
Definition: itemptr.h:82
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
Pointer Item
Definition: item.h:17
Relation rs_new_rel
Definition: rewriteheap.c:135
int errcode(int sqlerrcode)
Definition: elog.c:698
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:416
Form_pg_class rd_rel
Definition: rel.h:109
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
uint16 OffsetNumber
Definition: off.h:24
HeapTupleHeader t_data
Definition: htup.h:68
#define ERROR
Definition: elog.h:46
Size PageGetHeapFreeSpace(Page page)
Definition: bufpage.c:984
ItemPointerData t_ctid
Definition: htup_details.h:160
ItemPointerData t_self
Definition: htup.h:65
uint32 t_len
Definition: htup.h:64
#define MaxHeapTupleSize
Definition: htup_details.h:559
#define RelationGetTargetPageFreeSpace(relation, defaultff)
Definition: rel.h:361
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:235
#define TOAST_TUPLE_THRESHOLD
Definition: heaptoast.h:48
#define InvalidOffsetNumber
Definition: off.h:26
#define ereport(elevel,...)
Definition: elog.h:157
RelFileNode rd_node
Definition: rel.h:56
#define Assert(condition)
Definition: c.h:804
static SMgrRelation RelationGetSmgr(Relation rel)
Definition: rel.h:544
size_t Size
Definition: c.h:540
void PageSetChecksumInplace(Page page, BlockNumber blkno)
Definition: bufpage.c:1532
#define MAXALIGN(LEN)
Definition: c.h:757
#define RelationNeedsWAL(relation)
Definition: rel.h:601
void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync)
Definition: smgr.c:462
#define HEAP_INSERT_SKIP_FSM
Definition: heapam.h:34
#define HeapTupleHasExternal(tuple)
Definition: htup_details.h:672
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define HEAP_INSERT_NO_LOGICAL
Definition: heapam.h:36
XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, Page page, bool page_std)
Definition: xloginsert.c:1050
#define elog(elevel,...)
Definition: elog.h:232
#define HEAP_DEFAULT_FILLFACTOR
Definition: rel.h:332
HeapTuple heap_toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, int options)
Definition: heaptoast.c:96
BlockNumber rs_blockno
Definition: rewriteheap.c:137
#define PageGetItem(page, itemId)
Definition: bufpage.h:340
Pointer Page
Definition: bufpage.h:78
#define ItemPointerSet(pointer, blockNumber, offNum)
Definition: itemptr.h:127
void PageInit(Page page, Size pageSize, Size specialSize)
Definition: bufpage.c:42

◆ rewrite_heap_dead_tuple()

bool rewrite_heap_dead_tuple ( RewriteState  state,
HeapTuple  old_tuple 
)

Definition at line 563 of file rewriteheap.c.

References Assert, HASH_FIND, HASH_REMOVE, hash_search(), heap_freetuple(), HeapTupleHeaderGetXmin, RewriteStateData::rs_unresolved_tups, HeapTupleData::t_data, HeapTupleData::t_self, TidHashKey::tid, UnresolvedTupData::tuple, and TidHashKey::xmin.

Referenced by heapam_relation_copy_for_cluster().

564 {
565  /*
566  * If we have already seen an earlier tuple in the update chain that
567  * points to this tuple, let's forget about that earlier tuple. It's in
568  * fact dead as well, our simple xmax < OldestXmin test in
569  * HeapTupleSatisfiesVacuum just wasn't enough to detect it. It happens
570  * when xmin of a tuple is greater than xmax, which sounds
571  * counter-intuitive but is perfectly valid.
572  *
573  * We don't bother to try to detect the situation the other way round,
574  * when we encounter the dead tuple first and then the recently dead one
575  * that points to it. If that happens, we'll have some unmatched entries
576  * in the UnresolvedTups hash table at the end. That can happen anyway,
577  * because a vacuum might have removed the dead tuple in the chain before
578  * us.
579  */
580  UnresolvedTup unresolved;
581  TidHashKey hashkey;
582  bool found;
583 
584  memset(&hashkey, 0, sizeof(hashkey));
585  hashkey.xmin = HeapTupleHeaderGetXmin(old_tuple->t_data);
586  hashkey.tid = old_tuple->t_self;
587 
588  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
589  HASH_FIND, NULL);
590 
591  if (unresolved != NULL)
592  {
593  /* Need to free the contained tuple as well as the hashtable entry */
594  heap_freetuple(unresolved->tuple);
595  hash_search(state->rs_unresolved_tups, &hashkey,
596  HASH_REMOVE, &found);
597  Assert(found);
598  return true;
599  }
600 
601  return false;
602 }
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
HeapTupleHeader t_data
Definition: htup.h:68
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:151
ItemPointerData t_self
Definition: htup.h:65
ItemPointerData tid
Definition: rewriteheap.c:166
#define Assert(condition)
Definition: c.h:804
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:313
TransactionId xmin
Definition: rewriteheap.c:165

◆ rewrite_heap_tuple()

void rewrite_heap_tuple ( RewriteState  state,
HeapTuple  old_tuple,
HeapTuple  new_tuple 
)

Definition at line 361 of file rewriteheap.c.

References Assert, HASH_ENTER, HASH_FIND, HASH_REMOVE, hash_search(), HEAP2_XACT_MASK, heap_copytuple(), heap_freetuple(), heap_freeze_tuple(), HEAP_UPDATED, HEAP_XACT_MASK, HEAP_XMAX_INVALID, HeapTupleHeaderGetUpdateXid, HeapTupleHeaderGetXmin, HeapTupleHeaderIndicatesMovedPartitions, HeapTupleHeaderIsOnlyLocked(), ItemPointerEquals(), ItemPointerSetInvalid, logical_rewrite_heap_tuple(), MemoryContextSwitchTo(), OldToNewMappingData::new_tid, UnresolvedTupData::old_tid, raw_heap_insert(), RelationData::rd_rel, RewriteStateData::rs_cutoff_multi, RewriteStateData::rs_cxt, RewriteStateData::rs_freeze_xid, RewriteStateData::rs_old_new_tid_map, RewriteStateData::rs_old_rel, RewriteStateData::rs_oldest_xmin, RewriteStateData::rs_unresolved_tups, HeapTupleHeaderData::t_choice, HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, HeapTupleHeaderData::t_heap, HeapTupleHeaderData::t_infomask, HeapTupleHeaderData::t_infomask2, HeapTupleData::t_self, TidHashKey::tid, TransactionIdPrecedes(), UnresolvedTupData::tuple, and TidHashKey::xmin.

Referenced by reform_and_rewrite_tuple().

363 {
364  MemoryContext old_cxt;
365  ItemPointerData old_tid;
366  TidHashKey hashkey;
367  bool found;
368  bool free_new;
369 
370  old_cxt = MemoryContextSwitchTo(state->rs_cxt);
371 
372  /*
373  * Copy the original tuple's visibility information into new_tuple.
374  *
375  * XXX we might later need to copy some t_infomask2 bits, too? Right now,
376  * we intentionally clear the HOT status bits.
377  */
378  memcpy(&new_tuple->t_data->t_choice.t_heap,
379  &old_tuple->t_data->t_choice.t_heap,
380  sizeof(HeapTupleFields));
381 
382  new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
383  new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
384  new_tuple->t_data->t_infomask |=
385  old_tuple->t_data->t_infomask & HEAP_XACT_MASK;
386 
387  /*
388  * While we have our hands on the tuple, we may as well freeze any
389  * eligible xmin or xmax, so that future VACUUM effort can be saved.
390  */
391  heap_freeze_tuple(new_tuple->t_data,
392  state->rs_old_rel->rd_rel->relfrozenxid,
393  state->rs_old_rel->rd_rel->relminmxid,
394  state->rs_freeze_xid,
395  state->rs_cutoff_multi);
396 
397  /*
398  * Invalid ctid means that ctid should point to the tuple itself. We'll
399  * override it later if the tuple is part of an update chain.
400  */
401  ItemPointerSetInvalid(&new_tuple->t_data->t_ctid);
402 
403  /*
404  * If the tuple has been updated, check the old-to-new mapping hash table.
405  */
406  if (!((old_tuple->t_data->t_infomask & HEAP_XMAX_INVALID) ||
407  HeapTupleHeaderIsOnlyLocked(old_tuple->t_data)) &&
409  !(ItemPointerEquals(&(old_tuple->t_self),
410  &(old_tuple->t_data->t_ctid))))
411  {
412  OldToNewMapping mapping;
413 
414  memset(&hashkey, 0, sizeof(hashkey));
415  hashkey.xmin = HeapTupleHeaderGetUpdateXid(old_tuple->t_data);
416  hashkey.tid = old_tuple->t_data->t_ctid;
417 
418  mapping = (OldToNewMapping)
419  hash_search(state->rs_old_new_tid_map, &hashkey,
420  HASH_FIND, NULL);
421 
422  if (mapping != NULL)
423  {
424  /*
425  * We've already copied the tuple that t_ctid points to, so we can
426  * set the ctid of this tuple to point to the new location, and
427  * insert it right away.
428  */
429  new_tuple->t_data->t_ctid = mapping->new_tid;
430 
431  /* We don't need the mapping entry anymore */
432  hash_search(state->rs_old_new_tid_map, &hashkey,
433  HASH_REMOVE, &found);
434  Assert(found);
435  }
436  else
437  {
438  /*
439  * We haven't seen the tuple t_ctid points to yet. Stash this
440  * tuple into unresolved_tups to be written later.
441  */
442  UnresolvedTup unresolved;
443 
444  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
445  HASH_ENTER, &found);
446  Assert(!found);
447 
448  unresolved->old_tid = old_tuple->t_self;
449  unresolved->tuple = heap_copytuple(new_tuple);
450 
451  /*
452  * We can't do anything more now, since we don't know where the
453  * tuple will be written.
454  */
455  MemoryContextSwitchTo(old_cxt);
456  return;
457  }
458  }
459 
460  /*
461  * Now we will write the tuple, and then check to see if it is the B tuple
462  * in any new or known pair. When we resolve a known pair, we will be
463  * able to write that pair's A tuple, and then we have to check if it
464  * resolves some other pair. Hence, we need a loop here.
465  */
466  old_tid = old_tuple->t_self;
467  free_new = false;
468 
469  for (;;)
470  {
471  ItemPointerData new_tid;
472 
473  /* Insert the tuple and find out where it's put in new_heap */
474  raw_heap_insert(state, new_tuple);
475  new_tid = new_tuple->t_self;
476 
477  logical_rewrite_heap_tuple(state, old_tid, new_tuple);
478 
479  /*
480  * If the tuple is the updated version of a row, and the prior version
481  * wouldn't be DEAD yet, then we need to either resolve the prior
482  * version (if it's waiting in rs_unresolved_tups), or make an entry
483  * in rs_old_new_tid_map (so we can resolve it when we do see it). The
484  * previous tuple's xmax would equal this one's xmin, so it's
485  * RECENTLY_DEAD if and only if the xmin is not before OldestXmin.
486  */
487  if ((new_tuple->t_data->t_infomask & HEAP_UPDATED) &&
489  state->rs_oldest_xmin))
490  {
491  /*
492  * Okay, this is B in an update pair. See if we've seen A.
493  */
494  UnresolvedTup unresolved;
495 
496  memset(&hashkey, 0, sizeof(hashkey));
497  hashkey.xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
498  hashkey.tid = old_tid;
499 
500  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
501  HASH_FIND, NULL);
502 
503  if (unresolved != NULL)
504  {
505  /*
506  * We have seen and memorized the previous tuple already. Now
507  * that we know where we inserted the tuple its t_ctid points
508  * to, fix its t_ctid and insert it to the new heap.
509  */
510  if (free_new)
511  heap_freetuple(new_tuple);
512  new_tuple = unresolved->tuple;
513  free_new = true;
514  old_tid = unresolved->old_tid;
515  new_tuple->t_data->t_ctid = new_tid;
516 
517  /*
518  * We don't need the hash entry anymore, but don't free its
519  * tuple just yet.
520  */
521  hash_search(state->rs_unresolved_tups, &hashkey,
522  HASH_REMOVE, &found);
523  Assert(found);
524 
525  /* loop back to insert the previous tuple in the chain */
526  continue;
527  }
528  else
529  {
530  /*
531  * Remember the new tid of this tuple. We'll use it to set the
532  * ctid when we find the previous tuple in the chain.
533  */
534  OldToNewMapping mapping;
535 
536  mapping = hash_search(state->rs_old_new_tid_map, &hashkey,
537  HASH_ENTER, &found);
538  Assert(!found);
539 
540  mapping->new_tid = new_tid;
541  }
542  }
543 
544  /* Done with this (chain of) tuples, for now */
545  if (free_new)
546  heap_freetuple(new_tuple);
547  break;
548  }
549 
550  MemoryContextSwitchTo(old_cxt);
551 }
#define HeapTupleHeaderGetUpdateXid(tup)
Definition: htup_details.h:365
HeapTuple heap_copytuple(HeapTuple tuple)
Definition: heaptuple.c:680
HeapTupleFields t_heap
Definition: htup_details.h:156
static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
Definition: rewriteheap.c:1036
TransactionId rs_freeze_xid
Definition: rewriteheap.c:142
bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple)
MultiXactId rs_cutoff_multi
Definition: rewriteheap.c:146
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define HEAP2_XACT_MASK
Definition: htup_details.h:283
#define HeapTupleHeaderIndicatesMovedPartitions(tup)
Definition: htup_details.h:445
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
#define HEAP_UPDATED
Definition: htup_details.h:209
ItemPointerData old_tid
Definition: rewriteheap.c:175
Form_pg_class rd_rel
Definition: rel.h:109
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
union HeapTupleHeaderData::@43 t_choice
HeapTupleHeader t_data
Definition: htup.h:68
ItemPointerData new_tid
Definition: rewriteheap.c:184
#define HEAP_XMAX_INVALID
Definition: htup_details.h:207
ItemPointerData t_ctid
Definition: htup_details.h:160
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:151
ItemPointerData t_self
Definition: htup.h:65
bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId relfrozenxid, TransactionId relminmxid, TransactionId cutoff_xid, TransactionId cutoff_multi)
Definition: heapam.c:6652
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
TransactionId rs_oldest_xmin
Definition: rewriteheap.c:140
MemoryContext rs_cxt
Definition: rewriteheap.c:148
ItemPointerData tid
Definition: rewriteheap.c:166
#define Assert(condition)
Definition: c.h:804
HTAB * rs_old_new_tid_map
Definition: rewriteheap.c:152
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:313
bool ItemPointerEquals(ItemPointer pointer1, ItemPointer pointer2)
Definition: itemptr.c:29
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:172
TransactionId xmin
Definition: rewriteheap.c:165
static void raw_heap_insert(RewriteState state, HeapTuple tup)
Definition: rewriteheap.c:613
#define HEAP_XACT_MASK
Definition: htup_details.h:218
Relation rs_old_rel
Definition: rewriteheap.c:134
OldToNewMappingData * OldToNewMapping
Definition: rewriteheap.c:187