PostgreSQL Source Code  git master
rewriteheap.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <unistd.h>
#include "access/heapam.h"
#include "access/heapam_xlog.h"
#include "access/heaptoast.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xloginsert.h"
#include "catalog/catalog.h"
#include "lib/ilist.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/slot.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/procarray.h"
#include "storage/smgr.h"
#include "utils/memutils.h"
#include "utils/rel.h"
Include dependency graph for rewriteheap.c:

Go to the source code of this file.

Data Structures

struct  RewriteStateData
 
struct  TidHashKey
 
struct  UnresolvedTupData
 
struct  OldToNewMappingData
 
struct  RewriteMappingFile
 
struct  RewriteMappingDataEntry
 

Typedefs

typedef struct RewriteStateData RewriteStateData
 
typedef UnresolvedTupDataUnresolvedTup
 
typedef OldToNewMappingDataOldToNewMapping
 
typedef struct RewriteMappingFile RewriteMappingFile
 
typedef struct RewriteMappingDataEntry RewriteMappingDataEntry
 

Functions

static void raw_heap_insert (RewriteState state, HeapTuple tup)
 
static void logical_begin_heap_rewrite (RewriteState state)
 
static void logical_rewrite_heap_tuple (RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
 
static void logical_end_heap_rewrite (RewriteState state)
 
RewriteState begin_heap_rewrite (Relation old_heap, Relation new_heap, TransactionId oldest_xmin, TransactionId freeze_xid, MultiXactId cutoff_multi)
 
void end_heap_rewrite (RewriteState state)
 
void rewrite_heap_tuple (RewriteState state, HeapTuple old_tuple, HeapTuple new_tuple)
 
bool rewrite_heap_dead_tuple (RewriteState state, HeapTuple old_tuple)
 
static void logical_heap_rewrite_flush_mappings (RewriteState state)
 
static void logical_rewrite_log_mapping (RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
 
void heap_xlog_logical_rewrite (XLogReaderState *r)
 
void CheckPointLogicalRewriteHeap (void)
 

Typedef Documentation

◆ OldToNewMapping

Definition at line 187 of file rewriteheap.c.

◆ RewriteMappingDataEntry

◆ RewriteMappingFile

◆ RewriteStateData

◆ UnresolvedTup

Definition at line 179 of file rewriteheap.c.

Function Documentation

◆ begin_heap_rewrite()

RewriteState begin_heap_rewrite ( Relation  old_heap,
Relation  new_heap,
TransactionId  oldest_xmin,
TransactionId  freeze_xid,
MultiXactId  cutoff_multi 
)

Definition at line 237 of file rewriteheap.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CurrentMemoryContext, HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, logical_begin_heap_rewrite(), MemoryContextSwitchTo(), palloc(), palloc0(), RelationGetNumberOfBlocks, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_cutoff_multi, RewriteStateData::rs_cxt, RewriteStateData::rs_freeze_xid, RewriteStateData::rs_new_rel, RewriteStateData::rs_old_new_tid_map, RewriteStateData::rs_old_rel, RewriteStateData::rs_oldest_xmin, and RewriteStateData::rs_unresolved_tups.

Referenced by heapam_relation_copy_for_cluster().

239 {
241  MemoryContext rw_cxt;
242  MemoryContext old_cxt;
243  HASHCTL hash_ctl;
244 
245  /*
246  * To ease cleanup, make a separate context that will contain the
247  * RewriteState struct itself plus all subsidiary data.
248  */
250  "Table rewrite",
252  old_cxt = MemoryContextSwitchTo(rw_cxt);
253 
254  /* Create and fill in the state struct */
255  state = palloc0(sizeof(RewriteStateData));
256 
257  state->rs_old_rel = old_heap;
258  state->rs_new_rel = new_heap;
259  state->rs_buffer = (Page) palloc(BLCKSZ);
260  /* new_heap needn't be empty, just locked */
261  state->rs_blockno = RelationGetNumberOfBlocks(new_heap);
262  state->rs_buffer_valid = false;
263  state->rs_oldest_xmin = oldest_xmin;
264  state->rs_freeze_xid = freeze_xid;
265  state->rs_cutoff_multi = cutoff_multi;
266  state->rs_cxt = rw_cxt;
267 
268  /* Initialize hash tables used to track update chains */
269  hash_ctl.keysize = sizeof(TidHashKey);
270  hash_ctl.entrysize = sizeof(UnresolvedTupData);
271  hash_ctl.hcxt = state->rs_cxt;
272 
273  state->rs_unresolved_tups =
274  hash_create("Rewrite / Unresolved ctids",
275  128, /* arbitrary initial size */
276  &hash_ctl,
278 
279  hash_ctl.entrysize = sizeof(OldToNewMappingData);
280 
281  state->rs_old_new_tid_map =
282  hash_create("Rewrite / Old to new tid map",
283  128, /* arbitrary initial size */
284  &hash_ctl,
286 
287  MemoryContextSwitchTo(old_cxt);
288 
290 
291  return state;
292 }
#define AllocSetContextCreate
Definition: memutils.h:170
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
MemoryContext hcxt
Definition: hsearch.h:86
TransactionId rs_freeze_xid
Definition: rewriteheap.c:142
MultiXactId rs_cutoff_multi
Definition: rewriteheap.c:146
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Size entrysize
Definition: hsearch.h:76
Relation rs_new_rel
Definition: rewriteheap.c:135
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:151
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
TransactionId rs_oldest_xmin
Definition: rewriteheap.c:140
#define HASH_BLOBS
Definition: hsearch.h:97
MemoryContext rs_cxt
Definition: rewriteheap.c:148
void * palloc0(Size size)
Definition: mcxt.c:981
Size keysize
Definition: hsearch.h:75
#define RelationGetNumberOfBlocks(reln)
Definition: bufmgr.h:211
static void logical_begin_heap_rewrite(RewriteState state)
Definition: rewriteheap.c:793
HTAB * rs_old_new_tid_map
Definition: rewriteheap.c:152
Definition: regguts.h:317
void * palloc(Size size)
Definition: mcxt.c:950
Relation rs_old_rel
Definition: rewriteheap.c:134
BlockNumber rs_blockno
Definition: rewriteheap.c:137
Pointer Page
Definition: bufpage.h:78

◆ CheckPointLogicalRewriteHeap()

void CheckPointLogicalRewriteHeap ( void  )

Definition at line 1191 of file rewriteheap.c.

References AllocateDir(), CloseTransientFile(), dirent::d_name, data_sync_elevel(), DEBUG1, elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FreeDir(), GetRedoRecPtr(), InvalidXLogRecPtr, LOGICAL_REWRITE_FORMAT, lstat, MAXPGPATH, OpenTransientFile(), PG_BINARY, pg_fsync(), pgstat_report_wait_end(), pgstat_report_wait_start(), ReadDir(), ReplicationSlotsComputeLogicalRestartLSN(), S_ISREG, snprintf, stat::st_mode, and WAIT_EVENT_LOGICAL_REWRITE_CHECKPOINT_SYNC.

Referenced by CheckPointGuts().

1192 {
1193  XLogRecPtr cutoff;
1194  XLogRecPtr redo;
1195  DIR *mappings_dir;
1196  struct dirent *mapping_de;
1197  char path[MAXPGPATH + 20];
1198 
1199  /*
1200  * We start of with a minimum of the last redo pointer. No new decoding
1201  * slot will start before that, so that's a safe upper bound for removal.
1202  */
1203  redo = GetRedoRecPtr();
1204 
1205  /* now check for the restart ptrs from existing slots */
1207 
1208  /* don't start earlier than the restart lsn */
1209  if (cutoff != InvalidXLogRecPtr && redo < cutoff)
1210  cutoff = redo;
1211 
1212  mappings_dir = AllocateDir("pg_logical/mappings");
1213  while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
1214  {
1215  struct stat statbuf;
1216  Oid dboid;
1217  Oid relid;
1218  XLogRecPtr lsn;
1219  TransactionId rewrite_xid;
1220  TransactionId create_xid;
1221  uint32 hi,
1222  lo;
1223 
1224  if (strcmp(mapping_de->d_name, ".") == 0 ||
1225  strcmp(mapping_de->d_name, "..") == 0)
1226  continue;
1227 
1228  snprintf(path, sizeof(path), "pg_logical/mappings/%s", mapping_de->d_name);
1229  if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
1230  continue;
1231 
1232  /* Skip over files that cannot be ours. */
1233  if (strncmp(mapping_de->d_name, "map-", 4) != 0)
1234  continue;
1235 
1236  if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
1237  &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
1238  elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
1239 
1240  lsn = ((uint64) hi) << 32 | lo;
1241 
1242  if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
1243  {
1244  elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
1245  if (unlink(path) < 0)
1246  ereport(ERROR,
1248  errmsg("could not remove file \"%s\": %m", path)));
1249  }
1250  else
1251  {
1252  /* on some operating systems fsyncing a file requires O_RDWR */
1253  int fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
1254 
1255  /*
1256  * The file cannot vanish due to concurrency since this function
1257  * is the only one removing logical mappings and only one
1258  * checkpoint can be in progress at a time.
1259  */
1260  if (fd < 0)
1261  ereport(ERROR,
1263  errmsg("could not open file \"%s\": %m", path)));
1264 
1265  /*
1266  * We could try to avoid fsyncing files that either haven't
1267  * changed or have only been created since the checkpoint's start,
1268  * but it's currently not deemed worth the effort.
1269  */
1271  if (pg_fsync(fd) != 0)
1274  errmsg("could not fsync file \"%s\": %m", path)));
1276 
1277  if (CloseTransientFile(fd) != 0)
1278  ereport(ERROR,
1280  errmsg("could not close file \"%s\": %m", path)));
1281  }
1282  }
1283  FreeDir(mappings_dir);
1284 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define DEBUG1
Definition: elog.h:25
uint32 TransactionId
Definition: c.h:587
unsigned int Oid
Definition: postgres_ext.h:31
Definition: dirent.h:9
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1271
Definition: dirent.c:25
#define ERROR
Definition: elog.h:45
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2404
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:717
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition: slot.c:877
unsigned int uint32
Definition: c.h:441
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2615
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1512
#define S_ISREG(m)
Definition: win32_port.h:319
int CloseTransientFile(int fd)
Definition: fd.c:2581
int data_sync_elevel(int elevel)
Definition: fd.c:3635
#define ereport(elevel,...)
Definition: elog.h:155
uint64 XLogRecPtr
Definition: xlogdefs.h:21
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2681
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1488
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:8424
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
#define lstat(path, sb)
Definition: win32_port.h:276
int errmsg(const char *fmt,...)
Definition: elog.c:905
#define elog(elevel,...)
Definition: elog.h:227
int pg_fsync(int fd)
Definition: fd.c:347
char d_name[MAX_PATH]
Definition: dirent.h:15
#define snprintf
Definition: port.h:216
int FreeDir(DIR *dir)
Definition: fd.c:2733

◆ end_heap_rewrite()

void end_heap_rewrite ( RewriteState  state)

Definition at line 300 of file rewriteheap.c.

References hash_seq_init(), hash_seq_search(), ItemPointerSetInvalid, log_newpage(), logical_end_heap_rewrite(), MAIN_FORKNUM, MemoryContextDelete(), PageSetChecksumInplace(), raw_heap_insert(), RelationData::rd_node, RelationData::rd_smgr, RelationNeedsWAL, RelationOpenSmgr, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_cxt, RewriteStateData::rs_new_rel, RewriteStateData::rs_unresolved_tups, smgrextend(), smgrimmedsync(), HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, and UnresolvedTupData::tuple.

Referenced by heapam_relation_copy_for_cluster().

301 {
302  HASH_SEQ_STATUS seq_status;
303  UnresolvedTup unresolved;
304 
305  /*
306  * Write any remaining tuples in the UnresolvedTups table. If we have any
307  * left, they should in fact be dead, but let's err on the safe side.
308  */
309  hash_seq_init(&seq_status, state->rs_unresolved_tups);
310 
311  while ((unresolved = hash_seq_search(&seq_status)) != NULL)
312  {
313  ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
314  raw_heap_insert(state, unresolved->tuple);
315  }
316 
317  /* Write the last page, if any */
318  if (state->rs_buffer_valid)
319  {
320  if (RelationNeedsWAL(state->rs_new_rel))
321  log_newpage(&state->rs_new_rel->rd_node,
322  MAIN_FORKNUM,
323  state->rs_blockno,
324  state->rs_buffer,
325  true);
327 
329 
331  (char *) state->rs_buffer, true);
332  }
333 
334  /*
335  * When we WAL-logged rel pages, we must nonetheless fsync them. The
336  * reason is the same as in storage.c's RelationCopyStorage(): we're
337  * writing data that's not in shared buffers, and so a CHECKPOINT
338  * occurring during the rewriteheap operation won't have fsync'd data we
339  * wrote before the checkpoint.
340  */
341  if (RelationNeedsWAL(state->rs_new_rel))
343 
345 
346  /* Deleting the context frees everything */
347  MemoryContextDelete(state->rs_cxt);
348 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:212
static void logical_end_heap_rewrite(RewriteState state)
Definition: rewriteheap.c:939
struct SMgrRelationData * rd_smgr
Definition: rel.h:57
Relation rs_new_rel
Definition: rewriteheap.c:135
HeapTupleHeader t_data
Definition: htup.h:68
#define RelationOpenSmgr(relation)
Definition: rel.h:514
ItemPointerData t_ctid
Definition: htup_details.h:160
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:151
MemoryContext rs_cxt
Definition: rewriteheap.c:148
RelFileNode rd_node
Definition: rel.h:55
void PageSetChecksumInplace(Page page, BlockNumber blkno)
Definition: bufpage.c:1422
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
#define RelationNeedsWAL(relation)
Definition: rel.h:563
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync)
Definition: smgr.c:462
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:172
XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, Page page, bool page_std)
Definition: xloginsert.c:996
static void raw_heap_insert(RewriteState state, HeapTuple tup)
Definition: rewriteheap.c:614
BlockNumber rs_blockno
Definition: rewriteheap.c:137
void smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
Definition: smgr.c:660

◆ heap_xlog_logical_rewrite()

void heap_xlog_logical_rewrite ( XLogReaderState r)

Definition at line 1109 of file rewriteheap.c.

References CloseTransientFile(), data_sync_elevel(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ftruncate, LOGICAL_REWRITE_FORMAT, LSN_FORMAT_ARGS, xl_heap_rewrite_mapping::mapped_db, xl_heap_rewrite_mapping::mapped_rel, xl_heap_rewrite_mapping::mapped_xid, MAXPGPATH, xl_heap_rewrite_mapping::num_mappings, xl_heap_rewrite_mapping::offset, OpenTransientFile(), PG_BINARY, pg_fsync(), pg_pwrite(), pgstat_report_wait_end(), pgstat_report_wait_start(), snprintf, xl_heap_rewrite_mapping::start_lsn, WAIT_EVENT_LOGICAL_REWRITE_MAPPING_SYNC, WAIT_EVENT_LOGICAL_REWRITE_MAPPING_WRITE, WAIT_EVENT_LOGICAL_REWRITE_TRUNCATE, XLogRecGetData, and XLogRecGetXid.

Referenced by heap2_redo().

1110 {
1111  char path[MAXPGPATH];
1112  int fd;
1113  xl_heap_rewrite_mapping *xlrec;
1114  uint32 len;
1115  char *data;
1116 
1117  xlrec = (xl_heap_rewrite_mapping *) XLogRecGetData(r);
1118 
1119  snprintf(path, MAXPGPATH,
1120  "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
1121  xlrec->mapped_db, xlrec->mapped_rel,
1122  LSN_FORMAT_ARGS(xlrec->start_lsn),
1123  xlrec->mapped_xid, XLogRecGetXid(r));
1124 
1125  fd = OpenTransientFile(path,
1126  O_CREAT | O_WRONLY | PG_BINARY);
1127  if (fd < 0)
1128  ereport(ERROR,
1130  errmsg("could not create file \"%s\": %m", path)));
1131 
1132  /*
1133  * Truncate all data that's not guaranteed to have been safely fsynced (by
1134  * previous record or by the last checkpoint).
1135  */
1137  if (ftruncate(fd, xlrec->offset) != 0)
1138  ereport(ERROR,
1140  errmsg("could not truncate file \"%s\" to %u: %m",
1141  path, (uint32) xlrec->offset)));
1143 
1144  data = XLogRecGetData(r) + sizeof(*xlrec);
1145 
1146  len = xlrec->num_mappings * sizeof(LogicalRewriteMappingData);
1147 
1148  /* write out tail end of mapping file (again) */
1149  errno = 0;
1151  if (pg_pwrite(fd, data, len, xlrec->offset) != len)
1152  {
1153  /* if write didn't set errno, assume problem is no disk space */
1154  if (errno == 0)
1155  errno = ENOSPC;
1156  ereport(ERROR,
1158  errmsg("could not write to file \"%s\": %m", path)));
1159  }
1161 
1162  /*
1163  * Now fsync all previously written data. We could improve things and only
1164  * do this for the last write to a file, but the required bookkeeping
1165  * doesn't seem worth the trouble.
1166  */
1168  if (pg_fsync(fd) != 0)
1171  errmsg("could not fsync file \"%s\": %m", path)));
1173 
1174  if (CloseTransientFile(fd) != 0)
1175  ereport(ERROR,
1177  errmsg("could not close file \"%s\": %m", path)));
1178 }
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1271
ssize_t pg_pwrite(int fd, const void *buf, size_t nbyte, off_t offset)
Definition: pwrite.c:27
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecGetData(decoder)
Definition: xlogreader.h:310
#define ERROR
Definition: elog.h:45
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2404
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:717
unsigned int uint32
Definition: c.h:441
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1512
int CloseTransientFile(int fd)
Definition: fd.c:2581
#define XLogRecGetXid(decoder)
Definition: xlogreader.h:307
int data_sync_elevel(int elevel)
Definition: fd.c:3635
#define ereport(elevel,...)
Definition: elog.h:155
TransactionId mapped_xid
Definition: heapam_xlog.h:380
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1488
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
int errmsg(const char *fmt,...)
Definition: elog.c:905
struct LogicalRewriteMappingData LogicalRewriteMappingData
int pg_fsync(int fd)
Definition: fd.c:347
#define snprintf
Definition: port.h:216
#define ftruncate(a, b)
Definition: win32_port.h:65

◆ logical_begin_heap_rewrite()

static void logical_begin_heap_rewrite ( RewriteState  state)
static

Definition at line 793 of file rewriteheap.c.

References HASHCTL::entrysize, GetXLogInsertRecPtr(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, HASHCTL::keysize, ProcArrayGetReplicationSlotXmin(), RelationIsAccessibleInLogicalDecoding, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_cxt, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_logical_xmin, RewriteStateData::rs_num_rewrite_mappings, and RewriteStateData::rs_old_rel.

Referenced by begin_heap_rewrite().

794 {
795  HASHCTL hash_ctl;
796  TransactionId logical_xmin;
797 
798  /*
799  * We only need to persist these mappings if the rewritten table can be
800  * accessed during logical decoding, if not, we can skip doing any
801  * additional work.
802  */
803  state->rs_logical_rewrite =
805 
806  if (!state->rs_logical_rewrite)
807  return;
808 
809  ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin);
810 
811  /*
812  * If there are no logical slots in progress we don't need to do anything,
813  * there cannot be any remappings for relevant rows yet. The relation's
814  * lock protects us against races.
815  */
816  if (logical_xmin == InvalidTransactionId)
817  {
818  state->rs_logical_rewrite = false;
819  return;
820  }
821 
822  state->rs_logical_xmin = logical_xmin;
824  state->rs_num_rewrite_mappings = 0;
825 
826  hash_ctl.keysize = sizeof(TransactionId);
827  hash_ctl.entrysize = sizeof(RewriteMappingFile);
828  hash_ctl.hcxt = state->rs_cxt;
829 
830  state->rs_logical_mappings =
831  hash_create("Logical rewrite mapping",
832  128, /* arbitrary initial size */
833  &hash_ctl,
835 }
TransactionId rs_logical_xmin
Definition: rewriteheap.c:144
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
uint32 TransactionId
Definition: c.h:587
MemoryContext hcxt
Definition: hsearch.h:86
Size entrysize
Definition: hsearch.h:76
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:3819
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:154
XLogRecPtr GetXLogInsertRecPtr(void)
Definition: xlog.c:11615
#define InvalidTransactionId
Definition: transam.h:31
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:150
HTAB * rs_logical_mappings
Definition: rewriteheap.c:153
#define HASH_BLOBS
Definition: hsearch.h:97
MemoryContext rs_cxt
Definition: rewriteheap.c:148
#define RelationIsAccessibleInLogicalDecoding(relation)
Definition: rel.h:620
Size keysize
Definition: hsearch.h:75
struct RewriteMappingFile RewriteMappingFile
Relation rs_old_rel
Definition: rewriteheap.c:134

◆ logical_end_heap_rewrite()

static void logical_end_heap_rewrite ( RewriteState  state)
static

Definition at line 939 of file rewriteheap.c.

References data_sync_elevel(), ereport, errcode_for_file_access(), errmsg(), ERROR, FileClose(), FileSync(), hash_seq_init(), hash_seq_search(), logical_heap_rewrite_flush_mappings(), RewriteMappingFile::path, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_num_rewrite_mappings, RewriteMappingFile::vfd, and WAIT_EVENT_LOGICAL_REWRITE_SYNC.

Referenced by end_heap_rewrite().

940 {
941  HASH_SEQ_STATUS seq_status;
942  RewriteMappingFile *src;
943 
944  /* done, no logical rewrite in progress */
945  if (!state->rs_logical_rewrite)
946  return;
947 
948  /* writeout remaining in-memory entries */
949  if (state->rs_num_rewrite_mappings > 0)
951 
952  /* Iterate over all mappings we have written and fsync the files. */
953  hash_seq_init(&seq_status, state->rs_logical_mappings);
954  while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
955  {
959  errmsg("could not fsync file \"%s\": %m", src->path)));
960  FileClose(src->vfd);
961  }
962  /* memory context cleanup will deal with the rest */
963 }
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:154
#define ERROR
Definition: elog.h:45
char path[MAXPGPATH]
Definition: rewriteheap.c:200
int FileSync(File file, uint32 wait_event_info)
Definition: fd.c:2159
int errcode_for_file_access(void)
Definition: elog.c:717
HTAB * rs_logical_mappings
Definition: rewriteheap.c:153
int data_sync_elevel(int elevel)
Definition: fd.c:3635
#define ereport(elevel,...)
Definition: elog.h:155
void FileClose(File file)
Definition: fd.c:1854
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
static void logical_heap_rewrite_flush_mappings(RewriteState state)
Definition: rewriteheap.c:841
int errmsg(const char *fmt,...)
Definition: elog.c:905

◆ logical_heap_rewrite_flush_mappings()

static void logical_heap_rewrite_flush_mappings ( RewriteState  state)
static

Definition at line 841 of file rewriteheap.c.

References Assert, dlist_mutable_iter::cur, DEBUG1, dlist_container, dlist_delete(), dlist_foreach_modify, elog, ereport, errcode_for_file_access(), errmsg(), ERROR, FileWrite(), hash_seq_init(), hash_seq_search(), InvalidOid, RewriteMappingDataEntry::map, xl_heap_rewrite_mapping::mapped_db, xl_heap_rewrite_mapping::mapped_rel, xl_heap_rewrite_mapping::mapped_xid, RewriteMappingFile::mappings, MyDatabaseId, RewriteMappingDataEntry::node, RewriteMappingFile::num_mappings, xl_heap_rewrite_mapping::num_mappings, RewriteMappingFile::off, xl_heap_rewrite_mapping::offset, palloc(), RewriteMappingFile::path, pfree(), RelationData::rd_rel, RelationGetRelid, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_num_rewrite_mappings, RewriteStateData::rs_old_rel, xl_heap_rewrite_mapping::start_lsn, RewriteMappingFile::vfd, WAIT_EVENT_LOGICAL_REWRITE_WRITE, RewriteMappingFile::xid, XLOG_HEAP2_REWRITE, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by logical_end_heap_rewrite(), and logical_rewrite_log_mapping().

842 {
843  HASH_SEQ_STATUS seq_status;
844  RewriteMappingFile *src;
845  dlist_mutable_iter iter;
846 
847  Assert(state->rs_logical_rewrite);
848 
849  /* no logical rewrite in progress, no need to iterate over mappings */
850  if (state->rs_num_rewrite_mappings == 0)
851  return;
852 
853  elog(DEBUG1, "flushing %u logical rewrite mapping entries",
854  state->rs_num_rewrite_mappings);
855 
856  hash_seq_init(&seq_status, state->rs_logical_mappings);
857  while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
858  {
859  char *waldata;
860  char *waldata_start;
862  Oid dboid;
863  uint32 len;
864  int written;
865 
866  /* this file hasn't got any new mappings */
867  if (src->num_mappings == 0)
868  continue;
869 
870  if (state->rs_old_rel->rd_rel->relisshared)
871  dboid = InvalidOid;
872  else
873  dboid = MyDatabaseId;
874 
875  xlrec.num_mappings = src->num_mappings;
876  xlrec.mapped_rel = RelationGetRelid(state->rs_old_rel);
877  xlrec.mapped_xid = src->xid;
878  xlrec.mapped_db = dboid;
879  xlrec.offset = src->off;
880  xlrec.start_lsn = state->rs_begin_lsn;
881 
882  /* write all mappings consecutively */
883  len = src->num_mappings * sizeof(LogicalRewriteMappingData);
884  waldata_start = waldata = palloc(len);
885 
886  /*
887  * collect data we need to write out, but don't modify ondisk data yet
888  */
889  dlist_foreach_modify(iter, &src->mappings)
890  {
892 
893  pmap = dlist_container(RewriteMappingDataEntry, node, iter.cur);
894 
895  memcpy(waldata, &pmap->map, sizeof(pmap->map));
896  waldata += sizeof(pmap->map);
897 
898  /* remove from the list and free */
899  dlist_delete(&pmap->node);
900  pfree(pmap);
901 
902  /* update bookkeeping */
903  state->rs_num_rewrite_mappings--;
904  src->num_mappings--;
905  }
906 
907  Assert(src->num_mappings == 0);
908  Assert(waldata == waldata_start + len);
909 
910  /*
911  * Note that we deviate from the usual WAL coding practices here,
912  * check the above "Logical rewrite support" comment for reasoning.
913  */
914  written = FileWrite(src->vfd, waldata_start, len, src->off,
916  if (written != len)
917  ereport(ERROR,
919  errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path,
920  written, len)));
921  src->off += len;
922 
923  XLogBeginInsert();
924  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
925  XLogRegisterData(waldata_start, len);
926 
927  /* write xlog record */
928  XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_REWRITE);
929 
930  pfree(waldata_start);
931  }
932  Assert(state->rs_num_rewrite_mappings == 0);
933 }
#define DEBUG1
Definition: elog.h:25
dlist_node * cur
Definition: ilist.h:180
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
#define XLOG_HEAP2_REWRITE
Definition: heapam_xlog.h:53
Form_pg_class rd_rel
Definition: rel.h:110
unsigned int Oid
Definition: postgres_ext.h:31
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:1057
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:154
#define ERROR
Definition: elog.h:45
char path[MAXPGPATH]
Definition: rewriteheap.c:200
TransactionId xid
Definition: rewriteheap.c:195
int errcode_for_file_access(void)
Definition: elog.c:717
unsigned int uint32
Definition: c.h:441
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:150
HTAB * rs_logical_mappings
Definition: rewriteheap.c:153
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:330
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:422
int FileWrite(File file, char *buffer, int amount, off_t offset, uint32 wait_event_info)
Definition: fd.c:2061
Oid MyDatabaseId
Definition: globals.c:86
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:155
TransactionId mapped_xid
Definition: heapam_xlog.h:380
#define Assert(condition)
Definition: c.h:804
dlist_head mappings
Definition: rewriteheap.c:199
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
void * palloc(Size size)
Definition: mcxt.c:950
int errmsg(const char *fmt,...)
Definition: elog.c:905
LogicalRewriteMappingData map
Definition: rewriteheap.c:209
#define elog(elevel,...)
Definition: elog.h:227
struct LogicalRewriteMappingData LogicalRewriteMappingData
Relation rs_old_rel
Definition: rewriteheap.c:134
void XLogBeginInsert(void)
Definition: xloginsert.c:123
#define RelationGetRelid(relation)
Definition: rel.h:457

◆ logical_rewrite_heap_tuple()

static void logical_rewrite_heap_tuple ( RewriteState  state,
ItemPointerData  old_tid,
HeapTuple  new_tuple 
)
static

Definition at line 1035 of file rewriteheap.c.

References HEAP_XMAX_IS_LOCKED_ONLY, HeapTupleHeaderGetUpdateXid, HeapTupleHeaderGetXmin, logical_rewrite_log_mapping(), LogicalRewriteMappingData::new_node, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_node, LogicalRewriteMappingData::old_tid, RelationData::rd_node, RewriteStateData::rs_logical_rewrite, RewriteStateData::rs_logical_xmin, RewriteStateData::rs_new_rel, RewriteStateData::rs_old_rel, HeapTupleData::t_data, HeapTupleHeaderData::t_infomask, HeapTupleData::t_self, TransactionIdEquals, TransactionIdIsNormal, and TransactionIdPrecedes().

Referenced by rewrite_heap_tuple().

1037 {
1038  ItemPointerData new_tid = new_tuple->t_self;
1039  TransactionId cutoff = state->rs_logical_xmin;
1040  TransactionId xmin;
1041  TransactionId xmax;
1042  bool do_log_xmin = false;
1043  bool do_log_xmax = false;
1045 
1046  /* no logical rewrite in progress, we don't need to log anything */
1047  if (!state->rs_logical_rewrite)
1048  return;
1049 
1050  xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
1051  /* use *GetUpdateXid to correctly deal with multixacts */
1052  xmax = HeapTupleHeaderGetUpdateXid(new_tuple->t_data);
1053 
1054  /*
1055  * Log the mapping iff the tuple has been created recently.
1056  */
1057  if (TransactionIdIsNormal(xmin) && !TransactionIdPrecedes(xmin, cutoff))
1058  do_log_xmin = true;
1059 
1060  if (!TransactionIdIsNormal(xmax))
1061  {
1062  /*
1063  * no xmax is set, can't have any permanent ones, so this check is
1064  * sufficient
1065  */
1066  }
1067  else if (HEAP_XMAX_IS_LOCKED_ONLY(new_tuple->t_data->t_infomask))
1068  {
1069  /* only locked, we don't care */
1070  }
1071  else if (!TransactionIdPrecedes(xmax, cutoff))
1072  {
1073  /* tuple has been deleted recently, log */
1074  do_log_xmax = true;
1075  }
1076 
1077  /* if neither needs to be logged, we're done */
1078  if (!do_log_xmin && !do_log_xmax)
1079  return;
1080 
1081  /* fill out mapping information */
1082  map.old_node = state->rs_old_rel->rd_node;
1083  map.old_tid = old_tid;
1084  map.new_node = state->rs_new_rel->rd_node;
1085  map.new_tid = new_tid;
1086 
1087  /* ---
1088  * Now persist the mapping for the individual xids that are affected. We
1089  * need to log for both xmin and xmax if they aren't the same transaction
1090  * since the mapping files are per "affected" xid.
1091  * We don't muster all that much effort detecting whether xmin and xmax
1092  * are actually the same transaction, we just check whether the xid is the
1093  * same disregarding subtransactions. Logging too much is relatively
1094  * harmless and we could never do the check fully since subtransaction
1095  * data is thrown away during restarts.
1096  * ---
1097  */
1098  if (do_log_xmin)
1099  logical_rewrite_log_mapping(state, xmin, &map);
1100  /* separately log mapping for xmax unless it'd be redundant */
1101  if (do_log_xmax && !TransactionIdEquals(xmin, xmax))
1102  logical_rewrite_log_mapping(state, xmax, &map);
1103 }
#define HeapTupleHeaderGetUpdateXid(tup)
Definition: htup_details.h:365
TransactionId rs_logical_xmin
Definition: rewriteheap.c:144
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
uint32 TransactionId
Definition: c.h:587
Relation rs_new_rel
Definition: rewriteheap.c:135
HeapTupleHeader t_data
Definition: htup.h:68
ItemPointerData t_self
Definition: htup.h:65
static void logical_rewrite_log_mapping(RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
Definition: rewriteheap.c:969
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define HEAP_XMAX_IS_LOCKED_ONLY(infomask)
Definition: htup_details.h:230
RelFileNode rd_node
Definition: rel.h:55
ItemPointerData new_tid
Definition: rewriteheap.h:40
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:313
Relation rs_old_rel
Definition: rewriteheap.c:134
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
ItemPointerData old_tid
Definition: rewriteheap.h:39

◆ logical_rewrite_log_mapping()

static void logical_rewrite_log_mapping ( RewriteState  state,
TransactionId  xid,
LogicalRewriteMappingData map 
)
static

Definition at line 969 of file rewriteheap.c.

References dlist_init(), dlist_push_tail(), ereport, errcode_for_file_access(), errmsg(), ERROR, GetCurrentTransactionId(), HASH_ENTER, hash_search(), InvalidOid, logical_heap_rewrite_flush_mappings(), LOGICAL_REWRITE_FORMAT, LSN_FORMAT_ARGS, RewriteMappingDataEntry::map, RewriteMappingFile::mappings, MAXPGPATH, MemoryContextAlloc(), MyDatabaseId, RewriteMappingDataEntry::node, RewriteMappingFile::num_mappings, RewriteMappingFile::off, RewriteMappingFile::path, PathNameOpenFile(), PG_BINARY, RelationData::rd_rel, RelationGetRelid, RewriteStateData::rs_begin_lsn, RewriteStateData::rs_cxt, RewriteStateData::rs_logical_mappings, RewriteStateData::rs_num_rewrite_mappings, RewriteStateData::rs_old_rel, snprintf, and RewriteMappingFile::vfd.

Referenced by logical_rewrite_heap_tuple().

971 {
972  RewriteMappingFile *src;
974  Oid relid;
975  bool found;
976 
977  relid = RelationGetRelid(state->rs_old_rel);
978 
979  /* look for existing mappings for this 'mapped' xid */
980  src = hash_search(state->rs_logical_mappings, &xid,
981  HASH_ENTER, &found);
982 
983  /*
984  * We haven't yet had the need to map anything for this xid, create
985  * per-xid data structures.
986  */
987  if (!found)
988  {
989  char path[MAXPGPATH];
990  Oid dboid;
991 
992  if (state->rs_old_rel->rd_rel->relisshared)
993  dboid = InvalidOid;
994  else
995  dboid = MyDatabaseId;
996 
997  snprintf(path, MAXPGPATH,
998  "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
999  dboid, relid,
1000  LSN_FORMAT_ARGS(state->rs_begin_lsn),
1001  xid, GetCurrentTransactionId());
1002 
1003  dlist_init(&src->mappings);
1004  src->num_mappings = 0;
1005  src->off = 0;
1006  memcpy(src->path, path, sizeof(path));
1007  src->vfd = PathNameOpenFile(path,
1008  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1009  if (src->vfd < 0)
1010  ereport(ERROR,
1012  errmsg("could not create file \"%s\": %m", path)));
1013  }
1014 
1015  pmap = MemoryContextAlloc(state->rs_cxt,
1016  sizeof(RewriteMappingDataEntry));
1017  memcpy(&pmap->map, map, sizeof(LogicalRewriteMappingData));
1018  dlist_push_tail(&src->mappings, &pmap->node);
1019  src->num_mappings++;
1020  state->rs_num_rewrite_mappings++;
1021 
1022  /*
1023  * Write out buffer every time we've too many in-memory entries across all
1024  * mapping files.
1025  */
1026  if (state->rs_num_rewrite_mappings >= 1000 /* arbitrary number */ )
1028 }
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1465
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
Form_pg_class rd_rel
Definition: rel.h:110
unsigned int Oid
Definition: postgres_ext.h:31
#define PG_BINARY
Definition: c.h:1271
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint32 rs_num_rewrite_mappings
Definition: rewriteheap.c:154
#define ERROR
Definition: elog.h:45
char path[MAXPGPATH]
Definition: rewriteheap.c:200
#define MAXPGPATH
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:438
int errcode_for_file_access(void)
Definition: elog.c:717
XLogRecPtr rs_begin_lsn
Definition: rewriteheap.c:150
HTAB * rs_logical_mappings
Definition: rewriteheap.c:153
MemoryContext rs_cxt
Definition: rewriteheap.c:148
Oid MyDatabaseId
Definition: globals.c:86
#define InvalidOid
Definition: postgres_ext.h:36
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
#define ereport(elevel,...)
Definition: elog.h:155
dlist_head mappings
Definition: rewriteheap.c:199
static void logical_heap_rewrite_flush_mappings(RewriteState state)
Definition: rewriteheap.c:841
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
int errmsg(const char *fmt,...)
Definition: elog.c:905
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797
LogicalRewriteMappingData map
Definition: rewriteheap.c:209
Relation rs_old_rel
Definition: rewriteheap.c:134
#define snprintf
Definition: port.h:216
#define RelationGetRelid(relation)
Definition: rel.h:457

◆ raw_heap_insert()

static void raw_heap_insert ( RewriteState  state,
HeapTuple  tup 
)
static

Definition at line 614 of file rewriteheap.c.

References Assert, elog, ereport, errcode(), errmsg(), ERROR, HEAP_DEFAULT_FILLFACTOR, heap_freetuple(), HEAP_INSERT_NO_LOGICAL, HEAP_INSERT_SKIP_FSM, heap_toast_insert_or_update(), HeapTupleHasExternal, InvalidOffsetNumber, ItemPointerIsValid, ItemPointerSet, log_newpage(), MAIN_FORKNUM, MAXALIGN, MaxHeapTupleSize, PageAddItem, PageGetHeapFreeSpace(), PageGetItem, PageGetItemId, PageInit(), PageSetChecksumInplace(), RelationData::rd_node, RelationData::rd_rel, RelationData::rd_smgr, RelationGetTargetPageFreeSpace, RelationNeedsWAL, RelationOpenSmgr, RewriteStateData::rs_blockno, RewriteStateData::rs_buffer, RewriteStateData::rs_buffer_valid, RewriteStateData::rs_new_rel, smgrextend(), HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, HeapTupleData::t_len, HeapTupleData::t_self, and TOAST_TUPLE_THRESHOLD.

Referenced by end_heap_rewrite(), and rewrite_heap_tuple().

615 {
616  Page page = state->rs_buffer;
617  Size pageFreeSpace,
618  saveFreeSpace;
619  Size len;
620  OffsetNumber newoff;
621  HeapTuple heaptup;
622 
623  /*
624  * If the new tuple is too big for storage or contains already toasted
625  * out-of-line attributes from some other relation, invoke the toaster.
626  *
627  * Note: below this point, heaptup is the data we actually intend to store
628  * into the relation; tup is the caller's original untoasted data.
629  */
630  if (state->rs_new_rel->rd_rel->relkind == RELKIND_TOASTVALUE)
631  {
632  /* toast table entries should never be recursively toasted */
634  heaptup = tup;
635  }
636  else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
637  {
639 
640  /*
641  * While rewriting the heap for VACUUM FULL / CLUSTER, make sure data
642  * for the TOAST table are not logically decoded. The main heap is
643  * WAL-logged as XLOG FPI records, which are not logically decoded.
644  */
645  options |= HEAP_INSERT_NO_LOGICAL;
646 
647  heaptup = heap_toast_insert_or_update(state->rs_new_rel, tup, NULL,
648  options);
649  }
650  else
651  heaptup = tup;
652 
653  len = MAXALIGN(heaptup->t_len); /* be conservative */
654 
655  /*
656  * If we're gonna fail for oversize tuple, do it right away
657  */
658  if (len > MaxHeapTupleSize)
659  ereport(ERROR,
660  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
661  errmsg("row is too big: size %zu, maximum size %zu",
662  len, MaxHeapTupleSize)));
663 
664  /* Compute desired extra freespace due to fillfactor option */
665  saveFreeSpace = RelationGetTargetPageFreeSpace(state->rs_new_rel,
667 
668  /* Now we can check to see if there's enough free space already. */
669  if (state->rs_buffer_valid)
670  {
671  pageFreeSpace = PageGetHeapFreeSpace(page);
672 
673  if (len + saveFreeSpace > pageFreeSpace)
674  {
675  /* Doesn't fit, so write out the existing page */
676 
677  /* XLOG stuff */
678  if (RelationNeedsWAL(state->rs_new_rel))
679  log_newpage(&state->rs_new_rel->rd_node,
680  MAIN_FORKNUM,
681  state->rs_blockno,
682  page,
683  true);
684 
685  /*
686  * Now write the page. We say skipFsync = true because there's no
687  * need for smgr to schedule an fsync for this write; we'll do it
688  * ourselves in end_heap_rewrite.
689  */
691 
692  PageSetChecksumInplace(page, state->rs_blockno);
693 
695  state->rs_blockno, (char *) page, true);
696 
697  state->rs_blockno++;
698  state->rs_buffer_valid = false;
699  }
700  }
701 
702  if (!state->rs_buffer_valid)
703  {
704  /* Initialize a new empty page */
705  PageInit(page, BLCKSZ, 0);
706  state->rs_buffer_valid = true;
707  }
708 
709  /* And now we can insert the tuple into the page */
710  newoff = PageAddItem(page, (Item) heaptup->t_data, heaptup->t_len,
711  InvalidOffsetNumber, false, true);
712  if (newoff == InvalidOffsetNumber)
713  elog(ERROR, "failed to add tuple");
714 
715  /* Update caller's t_self to the actual position where it was stored */
716  ItemPointerSet(&(tup->t_self), state->rs_blockno, newoff);
717 
718  /*
719  * Insert the correct position into CTID of the stored tuple, too, if the
720  * caller didn't supply a valid CTID.
721  */
722  if (!ItemPointerIsValid(&tup->t_data->t_ctid))
723  {
724  ItemId newitemid;
725  HeapTupleHeader onpage_tup;
726 
727  newitemid = PageGetItemId(page, newoff);
728  onpage_tup = (HeapTupleHeader) PageGetItem(page, newitemid);
729 
730  onpage_tup->t_ctid = tup->t_self;
731  }
732 
733  /* If heaptup is a private copy, release it. */
734  if (heaptup != tup)
735  heap_freetuple(heaptup);
736 }
#define ItemPointerIsValid(pointer)
Definition: itemptr.h:82
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
struct SMgrRelationData * rd_smgr
Definition: rel.h:57
Pointer Item
Definition: item.h:17
Relation rs_new_rel
Definition: rewriteheap.c:135
int errcode(int sqlerrcode)
Definition: elog.c:694
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:416
Form_pg_class rd_rel
Definition: rel.h:110
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
uint16 OffsetNumber
Definition: off.h:24
HeapTupleHeader t_data
Definition: htup.h:68
#define RelationOpenSmgr(relation)
Definition: rel.h:514
#define ERROR
Definition: elog.h:45
Size PageGetHeapFreeSpace(Page page)
Definition: bufpage.c:874
ItemPointerData t_ctid
Definition: htup_details.h:160
ItemPointerData t_self
Definition: htup.h:65
uint32 t_len
Definition: htup.h:64
#define MaxHeapTupleSize
Definition: htup_details.h:560
#define RelationGetTargetPageFreeSpace(relation, defaultff)
Definition: rel.h:341
#define PageGetItemId(page, offsetNumber)
Definition: bufpage.h:235
#define TOAST_TUPLE_THRESHOLD
Definition: heaptoast.h:48
#define InvalidOffsetNumber
Definition: off.h:26
#define ereport(elevel,...)
Definition: elog.h:155
RelFileNode rd_node
Definition: rel.h:55
#define Assert(condition)
Definition: c.h:804
size_t Size
Definition: c.h:540
void PageSetChecksumInplace(Page page, BlockNumber blkno)
Definition: bufpage.c:1422
#define MAXALIGN(LEN)
Definition: c.h:757
#define RelationNeedsWAL(relation)
Definition: rel.h:563
void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync)
Definition: smgr.c:462
#define HEAP_INSERT_SKIP_FSM
Definition: heapam.h:34
#define HeapTupleHasExternal(tuple)
Definition: htup_details.h:673
int errmsg(const char *fmt,...)
Definition: elog.c:905
#define HEAP_INSERT_NO_LOGICAL
Definition: heapam.h:36
XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, Page page, bool page_std)
Definition: xloginsert.c:996
#define elog(elevel,...)
Definition: elog.h:227
#define HEAP_DEFAULT_FILLFACTOR
Definition: rel.h:312
HeapTuple heap_toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, int options)
Definition: heaptoast.c:96
BlockNumber rs_blockno
Definition: rewriteheap.c:137
#define PageGetItem(page, itemId)
Definition: bufpage.h:340
Pointer Page
Definition: bufpage.h:78
#define ItemPointerSet(pointer, blockNumber, offNum)
Definition: itemptr.h:127
void PageInit(Page page, Size pageSize, Size specialSize)
Definition: bufpage.c:42

◆ rewrite_heap_dead_tuple()

bool rewrite_heap_dead_tuple ( RewriteState  state,
HeapTuple  old_tuple 
)

Definition at line 564 of file rewriteheap.c.

References Assert, HASH_FIND, HASH_REMOVE, hash_search(), heap_freetuple(), HeapTupleHeaderGetXmin, RewriteStateData::rs_unresolved_tups, HeapTupleData::t_data, HeapTupleData::t_self, TidHashKey::tid, UnresolvedTupData::tuple, and TidHashKey::xmin.

Referenced by heapam_relation_copy_for_cluster().

565 {
566  /*
567  * If we have already seen an earlier tuple in the update chain that
568  * points to this tuple, let's forget about that earlier tuple. It's in
569  * fact dead as well, our simple xmax < OldestXmin test in
570  * HeapTupleSatisfiesVacuum just wasn't enough to detect it. It happens
571  * when xmin of a tuple is greater than xmax, which sounds
572  * counter-intuitive but is perfectly valid.
573  *
574  * We don't bother to try to detect the situation the other way round,
575  * when we encounter the dead tuple first and then the recently dead one
576  * that points to it. If that happens, we'll have some unmatched entries
577  * in the UnresolvedTups hash table at the end. That can happen anyway,
578  * because a vacuum might have removed the dead tuple in the chain before
579  * us.
580  */
581  UnresolvedTup unresolved;
582  TidHashKey hashkey;
583  bool found;
584 
585  memset(&hashkey, 0, sizeof(hashkey));
586  hashkey.xmin = HeapTupleHeaderGetXmin(old_tuple->t_data);
587  hashkey.tid = old_tuple->t_self;
588 
589  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
590  HASH_FIND, NULL);
591 
592  if (unresolved != NULL)
593  {
594  /* Need to free the contained tuple as well as the hashtable entry */
595  heap_freetuple(unresolved->tuple);
596  hash_search(state->rs_unresolved_tups, &hashkey,
597  HASH_REMOVE, &found);
598  Assert(found);
599  return true;
600  }
601 
602  return false;
603 }
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
HeapTupleHeader t_data
Definition: htup.h:68
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:151
ItemPointerData t_self
Definition: htup.h:65
ItemPointerData tid
Definition: rewriteheap.c:166
#define Assert(condition)
Definition: c.h:804
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:313
TransactionId xmin
Definition: rewriteheap.c:165

◆ rewrite_heap_tuple()

void rewrite_heap_tuple ( RewriteState  state,
HeapTuple  old_tuple,
HeapTuple  new_tuple 
)

Definition at line 362 of file rewriteheap.c.

References Assert, HASH_ENTER, HASH_FIND, HASH_REMOVE, hash_search(), HEAP2_XACT_MASK, heap_copytuple(), heap_freetuple(), heap_freeze_tuple(), HEAP_UPDATED, HEAP_XACT_MASK, HEAP_XMAX_INVALID, HeapTupleHeaderGetUpdateXid, HeapTupleHeaderGetXmin, HeapTupleHeaderIndicatesMovedPartitions, HeapTupleHeaderIsOnlyLocked(), ItemPointerEquals(), ItemPointerSetInvalid, logical_rewrite_heap_tuple(), MemoryContextSwitchTo(), OldToNewMappingData::new_tid, UnresolvedTupData::old_tid, raw_heap_insert(), RelationData::rd_rel, RewriteStateData::rs_cutoff_multi, RewriteStateData::rs_cxt, RewriteStateData::rs_freeze_xid, RewriteStateData::rs_old_new_tid_map, RewriteStateData::rs_old_rel, RewriteStateData::rs_oldest_xmin, RewriteStateData::rs_unresolved_tups, HeapTupleHeaderData::t_choice, HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, HeapTupleHeaderData::t_heap, HeapTupleHeaderData::t_infomask, HeapTupleHeaderData::t_infomask2, HeapTupleData::t_self, TidHashKey::tid, TransactionIdPrecedes(), UnresolvedTupData::tuple, and TidHashKey::xmin.

Referenced by reform_and_rewrite_tuple().

364 {
365  MemoryContext old_cxt;
366  ItemPointerData old_tid;
367  TidHashKey hashkey;
368  bool found;
369  bool free_new;
370 
371  old_cxt = MemoryContextSwitchTo(state->rs_cxt);
372 
373  /*
374  * Copy the original tuple's visibility information into new_tuple.
375  *
376  * XXX we might later need to copy some t_infomask2 bits, too? Right now,
377  * we intentionally clear the HOT status bits.
378  */
379  memcpy(&new_tuple->t_data->t_choice.t_heap,
380  &old_tuple->t_data->t_choice.t_heap,
381  sizeof(HeapTupleFields));
382 
383  new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
384  new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
385  new_tuple->t_data->t_infomask |=
386  old_tuple->t_data->t_infomask & HEAP_XACT_MASK;
387 
388  /*
389  * While we have our hands on the tuple, we may as well freeze any
390  * eligible xmin or xmax, so that future VACUUM effort can be saved.
391  */
392  heap_freeze_tuple(new_tuple->t_data,
393  state->rs_old_rel->rd_rel->relfrozenxid,
394  state->rs_old_rel->rd_rel->relminmxid,
395  state->rs_freeze_xid,
396  state->rs_cutoff_multi);
397 
398  /*
399  * Invalid ctid means that ctid should point to the tuple itself. We'll
400  * override it later if the tuple is part of an update chain.
401  */
402  ItemPointerSetInvalid(&new_tuple->t_data->t_ctid);
403 
404  /*
405  * If the tuple has been updated, check the old-to-new mapping hash table.
406  */
407  if (!((old_tuple->t_data->t_infomask & HEAP_XMAX_INVALID) ||
408  HeapTupleHeaderIsOnlyLocked(old_tuple->t_data)) &&
410  !(ItemPointerEquals(&(old_tuple->t_self),
411  &(old_tuple->t_data->t_ctid))))
412  {
413  OldToNewMapping mapping;
414 
415  memset(&hashkey, 0, sizeof(hashkey));
416  hashkey.xmin = HeapTupleHeaderGetUpdateXid(old_tuple->t_data);
417  hashkey.tid = old_tuple->t_data->t_ctid;
418 
419  mapping = (OldToNewMapping)
420  hash_search(state->rs_old_new_tid_map, &hashkey,
421  HASH_FIND, NULL);
422 
423  if (mapping != NULL)
424  {
425  /*
426  * We've already copied the tuple that t_ctid points to, so we can
427  * set the ctid of this tuple to point to the new location, and
428  * insert it right away.
429  */
430  new_tuple->t_data->t_ctid = mapping->new_tid;
431 
432  /* We don't need the mapping entry anymore */
433  hash_search(state->rs_old_new_tid_map, &hashkey,
434  HASH_REMOVE, &found);
435  Assert(found);
436  }
437  else
438  {
439  /*
440  * We haven't seen the tuple t_ctid points to yet. Stash this
441  * tuple into unresolved_tups to be written later.
442  */
443  UnresolvedTup unresolved;
444 
445  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
446  HASH_ENTER, &found);
447  Assert(!found);
448 
449  unresolved->old_tid = old_tuple->t_self;
450  unresolved->tuple = heap_copytuple(new_tuple);
451 
452  /*
453  * We can't do anything more now, since we don't know where the
454  * tuple will be written.
455  */
456  MemoryContextSwitchTo(old_cxt);
457  return;
458  }
459  }
460 
461  /*
462  * Now we will write the tuple, and then check to see if it is the B tuple
463  * in any new or known pair. When we resolve a known pair, we will be
464  * able to write that pair's A tuple, and then we have to check if it
465  * resolves some other pair. Hence, we need a loop here.
466  */
467  old_tid = old_tuple->t_self;
468  free_new = false;
469 
470  for (;;)
471  {
472  ItemPointerData new_tid;
473 
474  /* Insert the tuple and find out where it's put in new_heap */
475  raw_heap_insert(state, new_tuple);
476  new_tid = new_tuple->t_self;
477 
478  logical_rewrite_heap_tuple(state, old_tid, new_tuple);
479 
480  /*
481  * If the tuple is the updated version of a row, and the prior version
482  * wouldn't be DEAD yet, then we need to either resolve the prior
483  * version (if it's waiting in rs_unresolved_tups), or make an entry
484  * in rs_old_new_tid_map (so we can resolve it when we do see it). The
485  * previous tuple's xmax would equal this one's xmin, so it's
486  * RECENTLY_DEAD if and only if the xmin is not before OldestXmin.
487  */
488  if ((new_tuple->t_data->t_infomask & HEAP_UPDATED) &&
490  state->rs_oldest_xmin))
491  {
492  /*
493  * Okay, this is B in an update pair. See if we've seen A.
494  */
495  UnresolvedTup unresolved;
496 
497  memset(&hashkey, 0, sizeof(hashkey));
498  hashkey.xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
499  hashkey.tid = old_tid;
500 
501  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
502  HASH_FIND, NULL);
503 
504  if (unresolved != NULL)
505  {
506  /*
507  * We have seen and memorized the previous tuple already. Now
508  * that we know where we inserted the tuple its t_ctid points
509  * to, fix its t_ctid and insert it to the new heap.
510  */
511  if (free_new)
512  heap_freetuple(new_tuple);
513  new_tuple = unresolved->tuple;
514  free_new = true;
515  old_tid = unresolved->old_tid;
516  new_tuple->t_data->t_ctid = new_tid;
517 
518  /*
519  * We don't need the hash entry anymore, but don't free its
520  * tuple just yet.
521  */
522  hash_search(state->rs_unresolved_tups, &hashkey,
523  HASH_REMOVE, &found);
524  Assert(found);
525 
526  /* loop back to insert the previous tuple in the chain */
527  continue;
528  }
529  else
530  {
531  /*
532  * Remember the new tid of this tuple. We'll use it to set the
533  * ctid when we find the previous tuple in the chain.
534  */
535  OldToNewMapping mapping;
536 
537  mapping = hash_search(state->rs_old_new_tid_map, &hashkey,
538  HASH_ENTER, &found);
539  Assert(!found);
540 
541  mapping->new_tid = new_tid;
542  }
543  }
544 
545  /* Done with this (chain of) tuples, for now */
546  if (free_new)
547  heap_freetuple(new_tuple);
548  break;
549  }
550 
551  MemoryContextSwitchTo(old_cxt);
552 }
#define HeapTupleHeaderGetUpdateXid(tup)
Definition: htup_details.h:365
HeapTuple heap_copytuple(HeapTuple tuple)
Definition: heaptuple.c:680
HeapTupleFields t_heap
Definition: htup_details.h:156
static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
Definition: rewriteheap.c:1035
TransactionId rs_freeze_xid
Definition: rewriteheap.c:142
bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple)
MultiXactId rs_cutoff_multi
Definition: rewriteheap.c:146
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define HEAP2_XACT_MASK
Definition: htup_details.h:283
#define HeapTupleHeaderIndicatesMovedPartitions(tup)
Definition: htup_details.h:445
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
#define HEAP_UPDATED
Definition: htup_details.h:209
ItemPointerData old_tid
Definition: rewriteheap.c:175
Form_pg_class rd_rel
Definition: rel.h:110
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
union HeapTupleHeaderData::@43 t_choice
HeapTupleHeader t_data
Definition: htup.h:68
ItemPointerData new_tid
Definition: rewriteheap.c:184
#define HEAP_XMAX_INVALID
Definition: htup_details.h:207
ItemPointerData t_ctid
Definition: htup_details.h:160
HTAB * rs_unresolved_tups
Definition: rewriteheap.c:151
ItemPointerData t_self
Definition: htup.h:65
bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId relfrozenxid, TransactionId relminmxid, TransactionId cutoff_xid, TransactionId cutoff_multi)
Definition: heapam.c:6674
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
TransactionId rs_oldest_xmin
Definition: rewriteheap.c:140
MemoryContext rs_cxt
Definition: rewriteheap.c:148
ItemPointerData tid
Definition: rewriteheap.c:166
#define Assert(condition)
Definition: c.h:804
HTAB * rs_old_new_tid_map
Definition: rewriteheap.c:152
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:313
bool ItemPointerEquals(ItemPointer pointer1, ItemPointer pointer2)
Definition: itemptr.c:29
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:172
TransactionId xmin
Definition: rewriteheap.c:165
static void raw_heap_insert(RewriteState state, HeapTuple tup)
Definition: rewriteheap.c:614
#define HEAP_XACT_MASK
Definition: htup_details.h:218
Relation rs_old_rel
Definition: rewriteheap.c:134
OldToNewMappingData * OldToNewMapping
Definition: rewriteheap.c:187