PostgreSQL Source Code  git master
rewriteheap.c File Reference
#include "postgres.h"
#include <unistd.h>
#include "access/heapam.h"
#include "access/heapam_xlog.h"
#include "access/heaptoast.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xloginsert.h"
#include "catalog/catalog.h"
#include "common/file_utils.h"
#include "lib/ilist.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/slot.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/procarray.h"
#include "storage/smgr.h"
#include "utils/memutils.h"
#include "utils/rel.h"
Include dependency graph for rewriteheap.c:

Go to the source code of this file.

Data Structures

struct  RewriteStateData
 
struct  TidHashKey
 
struct  UnresolvedTupData
 
struct  OldToNewMappingData
 
struct  RewriteMappingFile
 
struct  RewriteMappingDataEntry
 

Typedefs

typedef struct RewriteStateData RewriteStateData
 
typedef UnresolvedTupDataUnresolvedTup
 
typedef OldToNewMappingDataOldToNewMapping
 
typedef struct RewriteMappingFile RewriteMappingFile
 
typedef struct RewriteMappingDataEntry RewriteMappingDataEntry
 

Functions

static void raw_heap_insert (RewriteState state, HeapTuple tup)
 
static void logical_begin_heap_rewrite (RewriteState state)
 
static void logical_rewrite_heap_tuple (RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
 
static void logical_end_heap_rewrite (RewriteState state)
 
RewriteState begin_heap_rewrite (Relation old_heap, Relation new_heap, TransactionId oldest_xmin, TransactionId freeze_xid, MultiXactId cutoff_multi)
 
void end_heap_rewrite (RewriteState state)
 
void rewrite_heap_tuple (RewriteState state, HeapTuple old_tuple, HeapTuple new_tuple)
 
bool rewrite_heap_dead_tuple (RewriteState state, HeapTuple old_tuple)
 
static void logical_heap_rewrite_flush_mappings (RewriteState state)
 
static void logical_rewrite_log_mapping (RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
 
void heap_xlog_logical_rewrite (XLogReaderState *r)
 
void CheckPointLogicalRewriteHeap (void)
 

Typedef Documentation

◆ OldToNewMapping

Definition at line 187 of file rewriteheap.c.

◆ RewriteMappingDataEntry

◆ RewriteMappingFile

◆ RewriteStateData

◆ UnresolvedTup

Definition at line 179 of file rewriteheap.c.

Function Documentation

◆ begin_heap_rewrite()

RewriteState begin_heap_rewrite ( Relation  old_heap,
Relation  new_heap,
TransactionId  oldest_xmin,
TransactionId  freeze_xid,
MultiXactId  cutoff_multi 
)

Definition at line 236 of file rewriteheap.c.

238 {
240  MemoryContext rw_cxt;
241  MemoryContext old_cxt;
242  HASHCTL hash_ctl;
243 
244  /*
245  * To ease cleanup, make a separate context that will contain the
246  * RewriteState struct itself plus all subsidiary data.
247  */
249  "Table rewrite",
251  old_cxt = MemoryContextSwitchTo(rw_cxt);
252 
253  /* Create and fill in the state struct */
254  state = palloc0(sizeof(RewriteStateData));
255 
256  state->rs_old_rel = old_heap;
257  state->rs_new_rel = new_heap;
258  state->rs_buffer = (Page) palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
259  /* new_heap needn't be empty, just locked */
260  state->rs_blockno = RelationGetNumberOfBlocks(new_heap);
261  state->rs_buffer_valid = false;
262  state->rs_oldest_xmin = oldest_xmin;
263  state->rs_freeze_xid = freeze_xid;
264  state->rs_cutoff_multi = cutoff_multi;
265  state->rs_cxt = rw_cxt;
266 
267  /* Initialize hash tables used to track update chains */
268  hash_ctl.keysize = sizeof(TidHashKey);
269  hash_ctl.entrysize = sizeof(UnresolvedTupData);
270  hash_ctl.hcxt = state->rs_cxt;
271 
272  state->rs_unresolved_tups =
273  hash_create("Rewrite / Unresolved ctids",
274  128, /* arbitrary initial size */
275  &hash_ctl,
277 
278  hash_ctl.entrysize = sizeof(OldToNewMappingData);
279 
280  state->rs_old_new_tid_map =
281  hash_create("Rewrite / Old to new tid map",
282  128, /* arbitrary initial size */
283  &hash_ctl,
285 
286  MemoryContextSwitchTo(old_cxt);
287 
289 
290  return state;
291 }
#define RelationGetNumberOfBlocks(reln)
Definition: bufmgr.h:227
Pointer Page
Definition: bufpage.h:78
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:350
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
void * palloc0(Size size)
Definition: mcxt.c:1257
MemoryContext CurrentMemoryContext
Definition: mcxt.c:135
void * palloc_aligned(Size size, Size alignto, int flags)
Definition: mcxt.c:1446
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:138
#define PG_IO_ALIGN_SIZE
static void logical_begin_heap_rewrite(RewriteState state)
Definition: rewriteheap.c:793
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
MemoryContext hcxt
Definition: hsearch.h:86
Definition: regguts.h:323

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CurrentMemoryContext, HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, logical_begin_heap_rewrite(), MemoryContextSwitchTo(), palloc0(), palloc_aligned(), PG_IO_ALIGN_SIZE, and RelationGetNumberOfBlocks.

Referenced by heapam_relation_copy_for_cluster().

◆ CheckPointLogicalRewriteHeap()

void CheckPointLogicalRewriteHeap ( void  )

Definition at line 1189 of file rewriteheap.c.

1190 {
1191  XLogRecPtr cutoff;
1192  XLogRecPtr redo;
1193  DIR *mappings_dir;
1194  struct dirent *mapping_de;
1195  char path[MAXPGPATH + 20];
1196 
1197  /*
1198  * We start of with a minimum of the last redo pointer. No new decoding
1199  * slot will start before that, so that's a safe upper bound for removal.
1200  */
1201  redo = GetRedoRecPtr();
1202 
1203  /* now check for the restart ptrs from existing slots */
1205 
1206  /* don't start earlier than the restart lsn */
1207  if (cutoff != InvalidXLogRecPtr && redo < cutoff)
1208  cutoff = redo;
1209 
1210  mappings_dir = AllocateDir("pg_logical/mappings");
1211  while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
1212  {
1213  Oid dboid;
1214  Oid relid;
1215  XLogRecPtr lsn;
1216  TransactionId rewrite_xid;
1217  TransactionId create_xid;
1218  uint32 hi,
1219  lo;
1220  PGFileType de_type;
1221 
1222  if (strcmp(mapping_de->d_name, ".") == 0 ||
1223  strcmp(mapping_de->d_name, "..") == 0)
1224  continue;
1225 
1226  snprintf(path, sizeof(path), "pg_logical/mappings/%s", mapping_de->d_name);
1227  de_type = get_dirent_type(path, mapping_de, false, DEBUG1);
1228 
1229  if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
1230  continue;
1231 
1232  /* Skip over files that cannot be ours. */
1233  if (strncmp(mapping_de->d_name, "map-", 4) != 0)
1234  continue;
1235 
1236  if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
1237  &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
1238  elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
1239 
1240  lsn = ((uint64) hi) << 32 | lo;
1241 
1242  if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
1243  {
1244  elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
1245  if (unlink(path) < 0)
1246  ereport(ERROR,
1248  errmsg("could not remove file \"%s\": %m", path)));
1249  }
1250  else
1251  {
1252  /* on some operating systems fsyncing a file requires O_RDWR */
1253  int fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
1254 
1255  /*
1256  * The file cannot vanish due to concurrency since this function
1257  * is the only one removing logical mappings and only one
1258  * checkpoint can be in progress at a time.
1259  */
1260  if (fd < 0)
1261  ereport(ERROR,
1263  errmsg("could not open file \"%s\": %m", path)));
1264 
1265  /*
1266  * We could try to avoid fsyncing files that either haven't
1267  * changed or have only been created since the checkpoint's start,
1268  * but it's currently not deemed worth the effort.
1269  */
1270  pgstat_report_wait_start(WAIT_EVENT_LOGICAL_REWRITE_CHECKPOINT_SYNC);
1271  if (pg_fsync(fd) != 0)
1274  errmsg("could not fsync file \"%s\": %m", path)));
1276 
1277  if (CloseTransientFile(fd) != 0)
1278  ereport(ERROR,
1280  errmsg("could not close file \"%s\": %m", path)));
1281  }
1282  }
1283  FreeDir(mappings_dir);
1284 
1285  /* persist directory entries to disk */
1286  fsync_fname("pg_logical/mappings", true);
1287 }
unsigned int uint32
Definition: c.h:495
#define PG_BINARY
Definition: c.h:1283
uint32 TransactionId
Definition: c.h:641
int errcode_for_file_access(void)
Definition: elog.c:881
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2854
int FreeDir(DIR *dir)
Definition: fd.c:2906
int CloseTransientFile(int fd)
Definition: fd.c:2754
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:708
int data_sync_elevel(int elevel)
Definition: fd.c:3881
int pg_fsync(int fd)
Definition: fd.c:361
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2578
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2788
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
Definition: file_utils.c:525
PGFileType
Definition: file_utils.h:19
@ PGFILETYPE_REG
Definition: file_utils.h:22
@ PGFILETYPE_ERROR
Definition: file_utils.h:20
#define MAXPGPATH
#define snprintf
Definition: port.h:238
unsigned int Oid
Definition: postgres_ext.h:31
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition: slot.c:942
Definition: dirent.c:26
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:88
static void pgstat_report_wait_end(void)
Definition: wait_event.h:104
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:6051
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References AllocateDir(), CloseTransientFile(), dirent::d_name, data_sync_elevel(), DEBUG1, elog(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FreeDir(), fsync_fname(), get_dirent_type(), GetRedoRecPtr(), InvalidXLogRecPtr, LOGICAL_REWRITE_FORMAT, MAXPGPATH, OpenTransientFile(), PG_BINARY, pg_fsync(), PGFILETYPE_ERROR, PGFILETYPE_REG, pgstat_report_wait_end(), pgstat_report_wait_start(), ReadDir(), ReplicationSlotsComputeLogicalRestartLSN(), and snprintf.

Referenced by CheckPointGuts().

◆ end_heap_rewrite()

void end_heap_rewrite ( RewriteState  state)

Definition at line 299 of file rewriteheap.c.

300 {
301  HASH_SEQ_STATUS seq_status;
302  UnresolvedTup unresolved;
303 
304  /*
305  * Write any remaining tuples in the UnresolvedTups table. If we have any
306  * left, they should in fact be dead, but let's err on the safe side.
307  */
308  hash_seq_init(&seq_status, state->rs_unresolved_tups);
309 
310  while ((unresolved = hash_seq_search(&seq_status)) != NULL)
311  {
312  ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
313  raw_heap_insert(state, unresolved->tuple);
314  }
315 
316  /* Write the last page, if any */
317  if (state->rs_buffer_valid)
318  {
319  if (RelationNeedsWAL(state->rs_new_rel))
320  log_newpage(&state->rs_new_rel->rd_locator,
321  MAIN_FORKNUM,
322  state->rs_blockno,
323  state->rs_buffer,
324  true);
325 
326  PageSetChecksumInplace(state->rs_buffer, state->rs_blockno);
327 
329  state->rs_blockno, state->rs_buffer, true);
330  }
331 
332  /*
333  * When we WAL-logged rel pages, we must nonetheless fsync them. The
334  * reason is the same as in storage.c's RelationCopyStorage(): we're
335  * writing data that's not in shared buffers, and so a CHECKPOINT
336  * occurring during the rewriteheap operation won't have fsync'd data we
337  * wrote before the checkpoint.
338  */
339  if (RelationNeedsWAL(state->rs_new_rel))
341 
343 
344  /* Deleting the context frees everything */
345  MemoryContextDelete(state->rs_cxt);
346 }
void PageSetChecksumInplace(Page page, BlockNumber blkno)
Definition: bufpage.c:1542
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1431
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1421
static void ItemPointerSetInvalid(ItemPointerData *pointer)
Definition: itemptr.h:184
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:403
static SMgrRelation RelationGetSmgr(Relation rel)
Definition: rel.h:572
#define RelationNeedsWAL(relation)
Definition: rel.h:629
@ MAIN_FORKNUM
Definition: relpath.h:50
static void raw_heap_insert(RewriteState state, HeapTuple tup)
Definition: rewriteheap.c:612
static void logical_end_heap_rewrite(RewriteState state)
Definition: rewriteheap.c:939
void smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
Definition: smgr.c:721
void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const void *buffer, bool skipFsync)
Definition: smgr.c:498
HeapTupleHeader t_data
Definition: htup.h:68
ItemPointerData t_ctid
Definition: htup_details.h:161
XLogRecPtr log_newpage(RelFileLocator *rlocator, ForkNumber forknum, BlockNumber blkno, Page page, bool page_std)
Definition: xloginsert.c:1131

References hash_seq_init(), hash_seq_search(), ItemPointerSetInvalid(), log_newpage(), logical_end_heap_rewrite(), MAIN_FORKNUM, MemoryContextDelete(), PageSetChecksumInplace(), raw_heap_insert(), RelationGetSmgr(), RelationNeedsWAL, smgrextend(), smgrimmedsync(), HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, and UnresolvedTupData::tuple.

Referenced by heapam_relation_copy_for_cluster().

◆ heap_xlog_logical_rewrite()

void heap_xlog_logical_rewrite ( XLogReaderState r)

Definition at line 1107 of file rewriteheap.c.

1108 {
1109  char path[MAXPGPATH];
1110  int fd;
1111  xl_heap_rewrite_mapping *xlrec;
1112  uint32 len;
1113  char *data;
1114 
1115  xlrec = (xl_heap_rewrite_mapping *) XLogRecGetData(r);
1116 
1117  snprintf(path, MAXPGPATH,
1118  "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
1119  xlrec->mapped_db, xlrec->mapped_rel,
1120  LSN_FORMAT_ARGS(xlrec->start_lsn),
1121  xlrec->mapped_xid, XLogRecGetXid(r));
1122 
1123  fd = OpenTransientFile(path,
1124  O_CREAT | O_WRONLY | PG_BINARY);
1125  if (fd < 0)
1126  ereport(ERROR,
1128  errmsg("could not create file \"%s\": %m", path)));
1129 
1130  /*
1131  * Truncate all data that's not guaranteed to have been safely fsynced (by
1132  * previous record or by the last checkpoint).
1133  */
1134  pgstat_report_wait_start(WAIT_EVENT_LOGICAL_REWRITE_TRUNCATE);
1135  if (ftruncate(fd, xlrec->offset) != 0)
1136  ereport(ERROR,
1138  errmsg("could not truncate file \"%s\" to %u: %m",
1139  path, (uint32) xlrec->offset)));
1141 
1142  data = XLogRecGetData(r) + sizeof(*xlrec);
1143 
1144  len = xlrec->num_mappings * sizeof(LogicalRewriteMappingData);
1145 
1146  /* write out tail end of mapping file (again) */
1147  errno = 0;
1148  pgstat_report_wait_start(WAIT_EVENT_LOGICAL_REWRITE_MAPPING_WRITE);
1149  if (pg_pwrite(fd, data, len, xlrec->offset) != len)
1150  {
1151  /* if write didn't set errno, assume problem is no disk space */
1152  if (errno == 0)
1153  errno = ENOSPC;
1154  ereport(ERROR,
1156  errmsg("could not write to file \"%s\": %m", path)));
1157  }
1159 
1160  /*
1161  * Now fsync all previously written data. We could improve things and only
1162  * do this for the last write to a file, but the required bookkeeping
1163  * doesn't seem worth the trouble.
1164  */
1165  pgstat_report_wait_start(WAIT_EVENT_LOGICAL_REWRITE_MAPPING_SYNC);
1166  if (pg_fsync(fd) != 0)
1169  errmsg("could not fsync file \"%s\": %m", path)));
1171 
1172  if (CloseTransientFile(fd) != 0)
1173  ereport(ERROR,
1175  errmsg("could not close file \"%s\": %m", path)));
1176 }
const void size_t len
const void * data
#define pg_pwrite
Definition: port.h:226
struct LogicalRewriteMappingData LogicalRewriteMappingData
TransactionId mapped_xid
Definition: heapam_xlog.h:396
#define ftruncate(a, b)
Definition: win32_port.h:82
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecGetData(decoder)
Definition: xlogreader.h:415
#define XLogRecGetXid(decoder)
Definition: xlogreader.h:412

References CloseTransientFile(), data, data_sync_elevel(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ftruncate, len, LOGICAL_REWRITE_FORMAT, LSN_FORMAT_ARGS, xl_heap_rewrite_mapping::mapped_db, xl_heap_rewrite_mapping::mapped_rel, xl_heap_rewrite_mapping::mapped_xid, MAXPGPATH, xl_heap_rewrite_mapping::num_mappings, xl_heap_rewrite_mapping::offset, OpenTransientFile(), PG_BINARY, pg_fsync(), pg_pwrite, pgstat_report_wait_end(), pgstat_report_wait_start(), snprintf, xl_heap_rewrite_mapping::start_lsn, XLogRecGetData, and XLogRecGetXid.

Referenced by heap2_redo().

◆ logical_begin_heap_rewrite()

static void logical_begin_heap_rewrite ( RewriteState  state)
static

Definition at line 793 of file rewriteheap.c.

794 {
795  HASHCTL hash_ctl;
796  TransactionId logical_xmin;
797 
798  /*
799  * We only need to persist these mappings if the rewritten table can be
800  * accessed during logical decoding, if not, we can skip doing any
801  * additional work.
802  */
803  state->rs_logical_rewrite =
805 
806  if (!state->rs_logical_rewrite)
807  return;
808 
809  ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin);
810 
811  /*
812  * If there are no logical slots in progress we don't need to do anything,
813  * there cannot be any remappings for relevant rows yet. The relation's
814  * lock protects us against races.
815  */
816  if (logical_xmin == InvalidTransactionId)
817  {
818  state->rs_logical_rewrite = false;
819  return;
820  }
821 
822  state->rs_logical_xmin = logical_xmin;
823  state->rs_begin_lsn = GetXLogInsertRecPtr();
824  state->rs_num_rewrite_mappings = 0;
825 
826  hash_ctl.keysize = sizeof(TransactionId);
827  hash_ctl.entrysize = sizeof(RewriteMappingFile);
828  hash_ctl.hcxt = state->rs_cxt;
829 
830  state->rs_logical_mappings =
831  hash_create("Logical rewrite mapping",
832  128, /* arbitrary initial size */
833  &hash_ctl,
835 }
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:3872
#define RelationIsAccessibleInLogicalDecoding(relation)
Definition: rel.h:685
struct RewriteMappingFile RewriteMappingFile
#define InvalidTransactionId
Definition: transam.h:31
XLogRecPtr GetXLogInsertRecPtr(void)
Definition: xlog.c:8923

References HASHCTL::entrysize, GetXLogInsertRecPtr(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, HASHCTL::keysize, ProcArrayGetReplicationSlotXmin(), and RelationIsAccessibleInLogicalDecoding.

Referenced by begin_heap_rewrite().

◆ logical_end_heap_rewrite()

static void logical_end_heap_rewrite ( RewriteState  state)
static

Definition at line 939 of file rewriteheap.c.

940 {
941  HASH_SEQ_STATUS seq_status;
942  RewriteMappingFile *src;
943 
944  /* done, no logical rewrite in progress */
945  if (!state->rs_logical_rewrite)
946  return;
947 
948  /* writeout remaining in-memory entries */
949  if (state->rs_num_rewrite_mappings > 0)
951 
952  /* Iterate over all mappings we have written and fsync the files. */
953  hash_seq_init(&seq_status, state->rs_logical_mappings);
954  while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
955  {
956  if (FileSync(src->vfd, WAIT_EVENT_LOGICAL_REWRITE_SYNC) != 0)
959  errmsg("could not fsync file \"%s\": %m", src->path)));
960  FileClose(src->vfd);
961  }
962  /* memory context cleanup will deal with the rest */
963 }
int FileSync(File file, uint32 wait_event_info)
Definition: fd.c:2242
void FileClose(File file)
Definition: fd.c:1930
static void logical_heap_rewrite_flush_mappings(RewriteState state)
Definition: rewriteheap.c:841
char path[MAXPGPATH]
Definition: rewriteheap.c:199

References data_sync_elevel(), ereport, errcode_for_file_access(), errmsg(), ERROR, FileClose(), FileSync(), hash_seq_init(), hash_seq_search(), logical_heap_rewrite_flush_mappings(), RewriteMappingFile::path, and RewriteMappingFile::vfd.

Referenced by end_heap_rewrite().

◆ logical_heap_rewrite_flush_mappings()

static void logical_heap_rewrite_flush_mappings ( RewriteState  state)
static

Definition at line 841 of file rewriteheap.c.

842 {
843  HASH_SEQ_STATUS seq_status;
844  RewriteMappingFile *src;
845  dlist_mutable_iter iter;
846 
847  Assert(state->rs_logical_rewrite);
848 
849  /* no logical rewrite in progress, no need to iterate over mappings */
850  if (state->rs_num_rewrite_mappings == 0)
851  return;
852 
853  elog(DEBUG1, "flushing %u logical rewrite mapping entries",
854  state->rs_num_rewrite_mappings);
855 
856  hash_seq_init(&seq_status, state->rs_logical_mappings);
857  while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
858  {
859  char *waldata;
860  char *waldata_start;
862  Oid dboid;
863  uint32 len;
864  int written;
865  uint32 num_mappings = dclist_count(&src->mappings);
866 
867  /* this file hasn't got any new mappings */
868  if (num_mappings == 0)
869  continue;
870 
871  if (state->rs_old_rel->rd_rel->relisshared)
872  dboid = InvalidOid;
873  else
874  dboid = MyDatabaseId;
875 
876  xlrec.num_mappings = num_mappings;
877  xlrec.mapped_rel = RelationGetRelid(state->rs_old_rel);
878  xlrec.mapped_xid = src->xid;
879  xlrec.mapped_db = dboid;
880  xlrec.offset = src->off;
881  xlrec.start_lsn = state->rs_begin_lsn;
882 
883  /* write all mappings consecutively */
884  len = num_mappings * sizeof(LogicalRewriteMappingData);
885  waldata_start = waldata = palloc(len);
886 
887  /*
888  * collect data we need to write out, but don't modify ondisk data yet
889  */
890  dclist_foreach_modify(iter, &src->mappings)
891  {
893 
894  pmap = dclist_container(RewriteMappingDataEntry, node, iter.cur);
895 
896  memcpy(waldata, &pmap->map, sizeof(pmap->map));
897  waldata += sizeof(pmap->map);
898 
899  /* remove from the list and free */
900  dclist_delete_from(&src->mappings, &pmap->node);
901  pfree(pmap);
902 
903  /* update bookkeeping */
904  state->rs_num_rewrite_mappings--;
905  }
906 
907  Assert(dclist_count(&src->mappings) == 0);
908  Assert(waldata == waldata_start + len);
909 
910  /*
911  * Note that we deviate from the usual WAL coding practices here,
912  * check the above "Logical rewrite support" comment for reasoning.
913  */
914  written = FileWrite(src->vfd, waldata_start, len, src->off,
915  WAIT_EVENT_LOGICAL_REWRITE_WRITE);
916  if (written != len)
917  ereport(ERROR,
919  errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path,
920  written, len)));
921  src->off += len;
922 
923  XLogBeginInsert();
924  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
925  XLogRegisterData(waldata_start, len);
926 
927  /* write xlog record */
928  XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_REWRITE);
929 
930  pfree(waldata_start);
931  }
932  Assert(state->rs_num_rewrite_mappings == 0);
933 }
int FileWrite(File file, const void *buffer, size_t amount, off_t offset, uint32 wait_event_info)
Definition: fd.c:2144
Oid MyDatabaseId
Definition: globals.c:89
#define XLOG_HEAP2_REWRITE
Definition: heapam_xlog.h:53
#define dclist_container(type, membername, ptr)
Definition: ilist.h:947
static uint32 dclist_count(const dclist_head *head)
Definition: ilist.h:932
static void dclist_delete_from(dclist_head *head, dlist_node *node)
Definition: ilist.h:763
#define dclist_foreach_modify(iter, lhead)
Definition: ilist.h:973
Assert(fmt[strlen(fmt) - 1] !='\n')
void pfree(void *pointer)
Definition: mcxt.c:1456
void * palloc(Size size)
Definition: mcxt.c:1226
#define InvalidOid
Definition: postgres_ext.h:36
#define RelationGetRelid(relation)
Definition: rel.h:504
LogicalRewriteMappingData map
Definition: rewriteheap.c:208
TransactionId xid
Definition: rewriteheap.c:195
dclist_head mappings
Definition: rewriteheap.c:198
dlist_node * cur
Definition: ilist.h:200
void XLogRegisterData(char *data, uint32 len)
Definition: xloginsert.c:351
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:461
void XLogBeginInsert(void)
Definition: xloginsert.c:150

References Assert(), dlist_mutable_iter::cur, dclist_container, dclist_count(), dclist_delete_from(), dclist_foreach_modify, DEBUG1, elog(), ereport, errcode_for_file_access(), errmsg(), ERROR, FileWrite(), hash_seq_init(), hash_seq_search(), InvalidOid, len, RewriteMappingDataEntry::map, xl_heap_rewrite_mapping::mapped_db, xl_heap_rewrite_mapping::mapped_rel, xl_heap_rewrite_mapping::mapped_xid, RewriteMappingFile::mappings, MyDatabaseId, RewriteMappingDataEntry::node, xl_heap_rewrite_mapping::num_mappings, RewriteMappingFile::off, xl_heap_rewrite_mapping::offset, palloc(), RewriteMappingFile::path, pfree(), RelationGetRelid, xl_heap_rewrite_mapping::start_lsn, RewriteMappingFile::vfd, RewriteMappingFile::xid, XLOG_HEAP2_REWRITE, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by logical_end_heap_rewrite(), and logical_rewrite_log_mapping().

◆ logical_rewrite_heap_tuple()

static void logical_rewrite_heap_tuple ( RewriteState  state,
ItemPointerData  old_tid,
HeapTuple  new_tuple 
)
static

Definition at line 1033 of file rewriteheap.c.

1035 {
1036  ItemPointerData new_tid = new_tuple->t_self;
1037  TransactionId cutoff = state->rs_logical_xmin;
1038  TransactionId xmin;
1039  TransactionId xmax;
1040  bool do_log_xmin = false;
1041  bool do_log_xmax = false;
1043 
1044  /* no logical rewrite in progress, we don't need to log anything */
1045  if (!state->rs_logical_rewrite)
1046  return;
1047 
1048  xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
1049  /* use *GetUpdateXid to correctly deal with multixacts */
1050  xmax = HeapTupleHeaderGetUpdateXid(new_tuple->t_data);
1051 
1052  /*
1053  * Log the mapping iff the tuple has been created recently.
1054  */
1055  if (TransactionIdIsNormal(xmin) && !TransactionIdPrecedes(xmin, cutoff))
1056  do_log_xmin = true;
1057 
1058  if (!TransactionIdIsNormal(xmax))
1059  {
1060  /*
1061  * no xmax is set, can't have any permanent ones, so this check is
1062  * sufficient
1063  */
1064  }
1065  else if (HEAP_XMAX_IS_LOCKED_ONLY(new_tuple->t_data->t_infomask))
1066  {
1067  /* only locked, we don't care */
1068  }
1069  else if (!TransactionIdPrecedes(xmax, cutoff))
1070  {
1071  /* tuple has been deleted recently, log */
1072  do_log_xmax = true;
1073  }
1074 
1075  /* if neither needs to be logged, we're done */
1076  if (!do_log_xmin && !do_log_xmax)
1077  return;
1078 
1079  /* fill out mapping information */
1080  map.old_locator = state->rs_old_rel->rd_locator;
1081  map.old_tid = old_tid;
1082  map.new_locator = state->rs_new_rel->rd_locator;
1083  map.new_tid = new_tid;
1084 
1085  /* ---
1086  * Now persist the mapping for the individual xids that are affected. We
1087  * need to log for both xmin and xmax if they aren't the same transaction
1088  * since the mapping files are per "affected" xid.
1089  * We don't muster all that much effort detecting whether xmin and xmax
1090  * are actually the same transaction, we just check whether the xid is the
1091  * same disregarding subtransactions. Logging too much is relatively
1092  * harmless and we could never do the check fully since subtransaction
1093  * data is thrown away during restarts.
1094  * ---
1095  */
1096  if (do_log_xmin)
1097  logical_rewrite_log_mapping(state, xmin, &map);
1098  /* separately log mapping for xmax unless it'd be redundant */
1099  if (do_log_xmax && !TransactionIdEquals(xmin, xmax))
1100  logical_rewrite_log_mapping(state, xmax, &map);
1101 }
#define HEAP_XMAX_IS_LOCKED_ONLY(infomask)
Definition: htup_details.h:227
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:309
#define HeapTupleHeaderGetUpdateXid(tup)
Definition: htup_details.h:361
static void logical_rewrite_log_mapping(RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
Definition: rewriteheap.c:969
ItemPointerData t_self
Definition: htup.h:65
ItemPointerData new_tid
Definition: rewriteheap.h:40
RelFileLocator old_locator
Definition: rewriteheap.h:37
ItemPointerData old_tid
Definition: rewriteheap.h:39
RelFileLocator new_locator
Definition: rewriteheap.h:38
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
#define TransactionIdIsNormal(xid)
Definition: transam.h:42

References HEAP_XMAX_IS_LOCKED_ONLY, HeapTupleHeaderGetUpdateXid, HeapTupleHeaderGetXmin, logical_rewrite_log_mapping(), LogicalRewriteMappingData::new_locator, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_locator, LogicalRewriteMappingData::old_tid, HeapTupleData::t_data, HeapTupleHeaderData::t_infomask, HeapTupleData::t_self, TransactionIdEquals, TransactionIdIsNormal, and TransactionIdPrecedes().

Referenced by rewrite_heap_tuple().

◆ logical_rewrite_log_mapping()

static void logical_rewrite_log_mapping ( RewriteState  state,
TransactionId  xid,
LogicalRewriteMappingData map 
)
static

Definition at line 969 of file rewriteheap.c.

971 {
972  RewriteMappingFile *src;
974  Oid relid;
975  bool found;
976 
977  relid = RelationGetRelid(state->rs_old_rel);
978 
979  /* look for existing mappings for this 'mapped' xid */
980  src = hash_search(state->rs_logical_mappings, &xid,
981  HASH_ENTER, &found);
982 
983  /*
984  * We haven't yet had the need to map anything for this xid, create
985  * per-xid data structures.
986  */
987  if (!found)
988  {
989  char path[MAXPGPATH];
990  Oid dboid;
991 
992  if (state->rs_old_rel->rd_rel->relisshared)
993  dboid = InvalidOid;
994  else
995  dboid = MyDatabaseId;
996 
997  snprintf(path, MAXPGPATH,
998  "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
999  dboid, relid,
1000  LSN_FORMAT_ARGS(state->rs_begin_lsn),
1001  xid, GetCurrentTransactionId());
1002 
1003  dclist_init(&src->mappings);
1004  src->off = 0;
1005  memcpy(src->path, path, sizeof(path));
1006  src->vfd = PathNameOpenFile(path,
1007  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1008  if (src->vfd < 0)
1009  ereport(ERROR,
1011  errmsg("could not create file \"%s\": %m", path)));
1012  }
1013 
1014  pmap = MemoryContextAlloc(state->rs_cxt,
1015  sizeof(RewriteMappingDataEntry));
1016  memcpy(&pmap->map, map, sizeof(LogicalRewriteMappingData));
1017  dclist_push_tail(&src->mappings, &pmap->node);
1018  state->rs_num_rewrite_mappings++;
1019 
1020  /*
1021  * Write out buffer every time we've too many in-memory entries across all
1022  * mapping files.
1023  */
1024  if (state->rs_num_rewrite_mappings >= 1000 /* arbitrary number */ )
1026 }
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:953
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1527
@ HASH_ENTER
Definition: hsearch.h:114
static void dclist_push_tail(dclist_head *head, dlist_node *node)
Definition: ilist.h:709
static void dclist_init(dclist_head *head)
Definition: ilist.h:671
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1021
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:445

References dclist_init(), dclist_push_tail(), ereport, errcode_for_file_access(), errmsg(), ERROR, GetCurrentTransactionId(), HASH_ENTER, hash_search(), InvalidOid, logical_heap_rewrite_flush_mappings(), LOGICAL_REWRITE_FORMAT, LSN_FORMAT_ARGS, RewriteMappingDataEntry::map, RewriteMappingFile::mappings, MAXPGPATH, MemoryContextAlloc(), MyDatabaseId, RewriteMappingDataEntry::node, RewriteMappingFile::off, RewriteMappingFile::path, PathNameOpenFile(), PG_BINARY, RelationGetRelid, snprintf, and RewriteMappingFile::vfd.

Referenced by logical_rewrite_heap_tuple().

◆ raw_heap_insert()

static void raw_heap_insert ( RewriteState  state,
HeapTuple  tup 
)
static

Definition at line 612 of file rewriteheap.c.

613 {
614  Page page = state->rs_buffer;
615  Size pageFreeSpace,
616  saveFreeSpace;
617  Size len;
618  OffsetNumber newoff;
619  HeapTuple heaptup;
620 
621  /*
622  * If the new tuple is too big for storage or contains already toasted
623  * out-of-line attributes from some other relation, invoke the toaster.
624  *
625  * Note: below this point, heaptup is the data we actually intend to store
626  * into the relation; tup is the caller's original untoasted data.
627  */
628  if (state->rs_new_rel->rd_rel->relkind == RELKIND_TOASTVALUE)
629  {
630  /* toast table entries should never be recursively toasted */
632  heaptup = tup;
633  }
634  else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
635  {
637 
638  /*
639  * While rewriting the heap for VACUUM FULL / CLUSTER, make sure data
640  * for the TOAST table are not logically decoded. The main heap is
641  * WAL-logged as XLOG FPI records, which are not logically decoded.
642  */
644 
645  heaptup = heap_toast_insert_or_update(state->rs_new_rel, tup, NULL,
646  options);
647  }
648  else
649  heaptup = tup;
650 
651  len = MAXALIGN(heaptup->t_len); /* be conservative */
652 
653  /*
654  * If we're gonna fail for oversize tuple, do it right away
655  */
656  if (len > MaxHeapTupleSize)
657  ereport(ERROR,
658  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
659  errmsg("row is too big: size %zu, maximum size %zu",
660  len, MaxHeapTupleSize)));
661 
662  /* Compute desired extra freespace due to fillfactor option */
663  saveFreeSpace = RelationGetTargetPageFreeSpace(state->rs_new_rel,
665 
666  /* Now we can check to see if there's enough free space already. */
667  if (state->rs_buffer_valid)
668  {
669  pageFreeSpace = PageGetHeapFreeSpace(page);
670 
671  if (len + saveFreeSpace > pageFreeSpace)
672  {
673  /*
674  * Doesn't fit, so write out the existing page. It always
675  * contains a tuple. Hence, unlike RelationGetBufferForTuple(),
676  * enforce saveFreeSpace unconditionally.
677  */
678 
679  /* XLOG stuff */
680  if (RelationNeedsWAL(state->rs_new_rel))
681  log_newpage(&state->rs_new_rel->rd_locator,
682  MAIN_FORKNUM,
683  state->rs_blockno,
684  page,
685  true);
686 
687  /*
688  * Now write the page. We say skipFsync = true because there's no
689  * need for smgr to schedule an fsync for this write; we'll do it
690  * ourselves in end_heap_rewrite.
691  */
692  PageSetChecksumInplace(page, state->rs_blockno);
693 
695  state->rs_blockno, page, true);
696 
697  state->rs_blockno++;
698  state->rs_buffer_valid = false;
699  }
700  }
701 
702  if (!state->rs_buffer_valid)
703  {
704  /* Initialize a new empty page */
705  PageInit(page, BLCKSZ, 0);
706  state->rs_buffer_valid = true;
707  }
708 
709  /* And now we can insert the tuple into the page */
710  newoff = PageAddItem(page, (Item) heaptup->t_data, heaptup->t_len,
711  InvalidOffsetNumber, false, true);
712  if (newoff == InvalidOffsetNumber)
713  elog(ERROR, "failed to add tuple");
714 
715  /* Update caller's t_self to the actual position where it was stored */
716  ItemPointerSet(&(tup->t_self), state->rs_blockno, newoff);
717 
718  /*
719  * Insert the correct position into CTID of the stored tuple, too, if the
720  * caller didn't supply a valid CTID.
721  */
722  if (!ItemPointerIsValid(&tup->t_data->t_ctid))
723  {
724  ItemId newitemid;
725  HeapTupleHeader onpage_tup;
726 
727  newitemid = PageGetItemId(page, newoff);
728  onpage_tup = (HeapTupleHeader) PageGetItem(page, newitemid);
729 
730  onpage_tup->t_ctid = tup->t_self;
731  }
732 
733  /* If heaptup is a private copy, release it. */
734  if (heaptup != tup)
735  heap_freetuple(heaptup);
736 }
Size PageGetHeapFreeSpace(Page page)
Definition: bufpage.c:991
void PageInit(Page page, Size pageSize, Size specialSize)
Definition: bufpage.c:42
static Item PageGetItem(Page page, ItemId itemId)
Definition: bufpage.h:351
static ItemId PageGetItemId(Page page, OffsetNumber offsetNumber)
Definition: bufpage.h:240
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:468
#define MAXALIGN(LEN)
Definition: c.h:800
size_t Size
Definition: c.h:594
int errcode(int sqlerrcode)
Definition: elog.c:858
#define HEAP_INSERT_SKIP_FSM
Definition: heapam.h:34
#define HEAP_INSERT_NO_LOGICAL
Definition: heapam.h:36
HeapTuple heap_toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, int options)
Definition: heaptoast.c:96
#define TOAST_TUPLE_THRESHOLD
Definition: heaptoast.h:48
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1426
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
#define HeapTupleHasExternal(tuple)
Definition: htup_details.h:671
#define MaxHeapTupleSize
Definition: htup_details.h:558
Pointer Item
Definition: item.h:17
static void ItemPointerSet(ItemPointerData *pointer, BlockNumber blockNumber, OffsetNumber offNum)
Definition: itemptr.h:135
static bool ItemPointerIsValid(const ItemPointerData *pointer)
Definition: itemptr.h:83
#define InvalidOffsetNumber
Definition: off.h:26
uint16 OffsetNumber
Definition: off.h:24
#define RelationGetTargetPageFreeSpace(relation, defaultff)
Definition: rel.h:377
#define HEAP_DEFAULT_FILLFACTOR
Definition: rel.h:348
uint32 t_len
Definition: htup.h:64

References Assert(), elog(), ereport, errcode(), errmsg(), ERROR, HEAP_DEFAULT_FILLFACTOR, heap_freetuple(), HEAP_INSERT_NO_LOGICAL, HEAP_INSERT_SKIP_FSM, heap_toast_insert_or_update(), HeapTupleHasExternal, InvalidOffsetNumber, ItemPointerIsValid(), ItemPointerSet(), len, log_newpage(), MAIN_FORKNUM, MAXALIGN, MaxHeapTupleSize, PageAddItem, PageGetHeapFreeSpace(), PageGetItem(), PageGetItemId(), PageInit(), PageSetChecksumInplace(), RelationGetSmgr(), RelationGetTargetPageFreeSpace, RelationNeedsWAL, smgrextend(), HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, HeapTupleData::t_len, HeapTupleData::t_self, and TOAST_TUPLE_THRESHOLD.

Referenced by end_heap_rewrite(), and rewrite_heap_tuple().

◆ rewrite_heap_dead_tuple()

bool rewrite_heap_dead_tuple ( RewriteState  state,
HeapTuple  old_tuple 
)

Definition at line 562 of file rewriteheap.c.

563 {
564  /*
565  * If we have already seen an earlier tuple in the update chain that
566  * points to this tuple, let's forget about that earlier tuple. It's in
567  * fact dead as well, our simple xmax < OldestXmin test in
568  * HeapTupleSatisfiesVacuum just wasn't enough to detect it. It happens
569  * when xmin of a tuple is greater than xmax, which sounds
570  * counter-intuitive but is perfectly valid.
571  *
572  * We don't bother to try to detect the situation the other way round,
573  * when we encounter the dead tuple first and then the recently dead one
574  * that points to it. If that happens, we'll have some unmatched entries
575  * in the UnresolvedTups hash table at the end. That can happen anyway,
576  * because a vacuum might have removed the dead tuple in the chain before
577  * us.
578  */
579  UnresolvedTup unresolved;
580  TidHashKey hashkey;
581  bool found;
582 
583  memset(&hashkey, 0, sizeof(hashkey));
584  hashkey.xmin = HeapTupleHeaderGetXmin(old_tuple->t_data);
585  hashkey.tid = old_tuple->t_self;
586 
587  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
588  HASH_FIND, NULL);
589 
590  if (unresolved != NULL)
591  {
592  /* Need to free the contained tuple as well as the hashtable entry */
593  heap_freetuple(unresolved->tuple);
594  hash_search(state->rs_unresolved_tups, &hashkey,
595  HASH_REMOVE, &found);
596  Assert(found);
597  return true;
598  }
599 
600  return false;
601 }
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_REMOVE
Definition: hsearch.h:115
TransactionId xmin
Definition: rewriteheap.c:165
ItemPointerData tid
Definition: rewriteheap.c:166

References Assert(), HASH_FIND, HASH_REMOVE, hash_search(), heap_freetuple(), HeapTupleHeaderGetXmin, HeapTupleData::t_data, HeapTupleData::t_self, TidHashKey::tid, UnresolvedTupData::tuple, and TidHashKey::xmin.

Referenced by heapam_relation_copy_for_cluster().

◆ rewrite_heap_tuple()

void rewrite_heap_tuple ( RewriteState  state,
HeapTuple  old_tuple,
HeapTuple  new_tuple 
)

Definition at line 360 of file rewriteheap.c.

362 {
363  MemoryContext old_cxt;
364  ItemPointerData old_tid;
365  TidHashKey hashkey;
366  bool found;
367  bool free_new;
368 
369  old_cxt = MemoryContextSwitchTo(state->rs_cxt);
370 
371  /*
372  * Copy the original tuple's visibility information into new_tuple.
373  *
374  * XXX we might later need to copy some t_infomask2 bits, too? Right now,
375  * we intentionally clear the HOT status bits.
376  */
377  memcpy(&new_tuple->t_data->t_choice.t_heap,
378  &old_tuple->t_data->t_choice.t_heap,
379  sizeof(HeapTupleFields));
380 
381  new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
382  new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
383  new_tuple->t_data->t_infomask |=
384  old_tuple->t_data->t_infomask & HEAP_XACT_MASK;
385 
386  /*
387  * While we have our hands on the tuple, we may as well freeze any
388  * eligible xmin or xmax, so that future VACUUM effort can be saved.
389  */
390  heap_freeze_tuple(new_tuple->t_data,
391  state->rs_old_rel->rd_rel->relfrozenxid,
392  state->rs_old_rel->rd_rel->relminmxid,
393  state->rs_freeze_xid,
394  state->rs_cutoff_multi);
395 
396  /*
397  * Invalid ctid means that ctid should point to the tuple itself. We'll
398  * override it later if the tuple is part of an update chain.
399  */
400  ItemPointerSetInvalid(&new_tuple->t_data->t_ctid);
401 
402  /*
403  * If the tuple has been updated, check the old-to-new mapping hash table.
404  */
405  if (!((old_tuple->t_data->t_infomask & HEAP_XMAX_INVALID) ||
406  HeapTupleHeaderIsOnlyLocked(old_tuple->t_data)) &&
408  !(ItemPointerEquals(&(old_tuple->t_self),
409  &(old_tuple->t_data->t_ctid))))
410  {
411  OldToNewMapping mapping;
412 
413  memset(&hashkey, 0, sizeof(hashkey));
414  hashkey.xmin = HeapTupleHeaderGetUpdateXid(old_tuple->t_data);
415  hashkey.tid = old_tuple->t_data->t_ctid;
416 
417  mapping = (OldToNewMapping)
418  hash_search(state->rs_old_new_tid_map, &hashkey,
419  HASH_FIND, NULL);
420 
421  if (mapping != NULL)
422  {
423  /*
424  * We've already copied the tuple that t_ctid points to, so we can
425  * set the ctid of this tuple to point to the new location, and
426  * insert it right away.
427  */
428  new_tuple->t_data->t_ctid = mapping->new_tid;
429 
430  /* We don't need the mapping entry anymore */
431  hash_search(state->rs_old_new_tid_map, &hashkey,
432  HASH_REMOVE, &found);
433  Assert(found);
434  }
435  else
436  {
437  /*
438  * We haven't seen the tuple t_ctid points to yet. Stash this
439  * tuple into unresolved_tups to be written later.
440  */
441  UnresolvedTup unresolved;
442 
443  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
444  HASH_ENTER, &found);
445  Assert(!found);
446 
447  unresolved->old_tid = old_tuple->t_self;
448  unresolved->tuple = heap_copytuple(new_tuple);
449 
450  /*
451  * We can't do anything more now, since we don't know where the
452  * tuple will be written.
453  */
454  MemoryContextSwitchTo(old_cxt);
455  return;
456  }
457  }
458 
459  /*
460  * Now we will write the tuple, and then check to see if it is the B tuple
461  * in any new or known pair. When we resolve a known pair, we will be
462  * able to write that pair's A tuple, and then we have to check if it
463  * resolves some other pair. Hence, we need a loop here.
464  */
465  old_tid = old_tuple->t_self;
466  free_new = false;
467 
468  for (;;)
469  {
470  ItemPointerData new_tid;
471 
472  /* Insert the tuple and find out where it's put in new_heap */
473  raw_heap_insert(state, new_tuple);
474  new_tid = new_tuple->t_self;
475 
476  logical_rewrite_heap_tuple(state, old_tid, new_tuple);
477 
478  /*
479  * If the tuple is the updated version of a row, and the prior version
480  * wouldn't be DEAD yet, then we need to either resolve the prior
481  * version (if it's waiting in rs_unresolved_tups), or make an entry
482  * in rs_old_new_tid_map (so we can resolve it when we do see it). The
483  * previous tuple's xmax would equal this one's xmin, so it's
484  * RECENTLY_DEAD if and only if the xmin is not before OldestXmin.
485  */
486  if ((new_tuple->t_data->t_infomask & HEAP_UPDATED) &&
488  state->rs_oldest_xmin))
489  {
490  /*
491  * Okay, this is B in an update pair. See if we've seen A.
492  */
493  UnresolvedTup unresolved;
494 
495  memset(&hashkey, 0, sizeof(hashkey));
496  hashkey.xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
497  hashkey.tid = old_tid;
498 
499  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
500  HASH_FIND, NULL);
501 
502  if (unresolved != NULL)
503  {
504  /*
505  * We have seen and memorized the previous tuple already. Now
506  * that we know where we inserted the tuple its t_ctid points
507  * to, fix its t_ctid and insert it to the new heap.
508  */
509  if (free_new)
510  heap_freetuple(new_tuple);
511  new_tuple = unresolved->tuple;
512  free_new = true;
513  old_tid = unresolved->old_tid;
514  new_tuple->t_data->t_ctid = new_tid;
515 
516  /*
517  * We don't need the hash entry anymore, but don't free its
518  * tuple just yet.
519  */
520  hash_search(state->rs_unresolved_tups, &hashkey,
521  HASH_REMOVE, &found);
522  Assert(found);
523 
524  /* loop back to insert the previous tuple in the chain */
525  continue;
526  }
527  else
528  {
529  /*
530  * Remember the new tid of this tuple. We'll use it to set the
531  * ctid when we find the previous tuple in the chain.
532  */
533  OldToNewMapping mapping;
534 
535  mapping = hash_search(state->rs_old_new_tid_map, &hashkey,
536  HASH_ENTER, &found);
537  Assert(!found);
538 
539  mapping->new_tid = new_tid;
540  }
541  }
542 
543  /* Done with this (chain of) tuples, for now */
544  if (free_new)
545  heap_freetuple(new_tuple);
546  break;
547  }
548 
549  MemoryContextSwitchTo(old_cxt);
550 }
bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId relfrozenxid, TransactionId relminmxid, TransactionId FreezeLimit, TransactionId MultiXactCutoff)
Definition: heapam.c:6915
bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple)
HeapTuple heap_copytuple(HeapTuple tuple)
Definition: heaptuple.c:768
#define HeapTupleHeaderIndicatesMovedPartitions(tup)
Definition: htup_details.h:444
#define HEAP2_XACT_MASK
Definition: htup_details.h:279
#define HEAP_XACT_MASK
Definition: htup_details.h:215
#define HEAP_XMAX_INVALID
Definition: htup_details.h:208
#define HEAP_UPDATED
Definition: htup_details.h:210
bool ItemPointerEquals(ItemPointer pointer1, ItemPointer pointer2)
Definition: itemptr.c:35
static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
Definition: rewriteheap.c:1033
OldToNewMappingData * OldToNewMapping
Definition: rewriteheap.c:187
union HeapTupleHeaderData::@45 t_choice
HeapTupleFields t_heap
Definition: htup_details.h:157
ItemPointerData new_tid
Definition: rewriteheap.c:184
ItemPointerData old_tid
Definition: rewriteheap.c:175

References Assert(), HASH_ENTER, HASH_FIND, HASH_REMOVE, hash_search(), HEAP2_XACT_MASK, heap_copytuple(), heap_freetuple(), heap_freeze_tuple(), HEAP_UPDATED, HEAP_XACT_MASK, HEAP_XMAX_INVALID, HeapTupleHeaderGetUpdateXid, HeapTupleHeaderGetXmin, HeapTupleHeaderIndicatesMovedPartitions, HeapTupleHeaderIsOnlyLocked(), ItemPointerEquals(), ItemPointerSetInvalid(), logical_rewrite_heap_tuple(), MemoryContextSwitchTo(), OldToNewMappingData::new_tid, UnresolvedTupData::old_tid, raw_heap_insert(), HeapTupleHeaderData::t_choice, HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, HeapTupleHeaderData::t_heap, HeapTupleHeaderData::t_infomask, HeapTupleHeaderData::t_infomask2, HeapTupleData::t_self, TidHashKey::tid, TransactionIdPrecedes(), UnresolvedTupData::tuple, and TidHashKey::xmin.

Referenced by reform_and_rewrite_tuple().