PostgreSQL Source Code  git master
rewriteheap.c File Reference
#include "postgres.h"
#include <unistd.h>
#include "access/heapam.h"
#include "access/heapam_xlog.h"
#include "access/heaptoast.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xloginsert.h"
#include "common/file_utils.h"
#include "lib/ilist.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/slot.h"
#include "storage/bufmgr.h"
#include "storage/bulk_write.h"
#include "storage/fd.h"
#include "storage/procarray.h"
#include "utils/memutils.h"
#include "utils/rel.h"
Include dependency graph for rewriteheap.c:

Go to the source code of this file.

Data Structures

struct  RewriteStateData
 
struct  TidHashKey
 
struct  UnresolvedTupData
 
struct  OldToNewMappingData
 
struct  RewriteMappingFile
 
struct  RewriteMappingDataEntry
 

Typedefs

typedef struct RewriteStateData RewriteStateData
 
typedef UnresolvedTupDataUnresolvedTup
 
typedef OldToNewMappingDataOldToNewMapping
 
typedef struct RewriteMappingFile RewriteMappingFile
 
typedef struct RewriteMappingDataEntry RewriteMappingDataEntry
 

Functions

static void raw_heap_insert (RewriteState state, HeapTuple tup)
 
static void logical_begin_heap_rewrite (RewriteState state)
 
static void logical_rewrite_heap_tuple (RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
 
static void logical_end_heap_rewrite (RewriteState state)
 
RewriteState begin_heap_rewrite (Relation old_heap, Relation new_heap, TransactionId oldest_xmin, TransactionId freeze_xid, MultiXactId cutoff_multi)
 
void end_heap_rewrite (RewriteState state)
 
void rewrite_heap_tuple (RewriteState state, HeapTuple old_tuple, HeapTuple new_tuple)
 
bool rewrite_heap_dead_tuple (RewriteState state, HeapTuple old_tuple)
 
static void logical_heap_rewrite_flush_mappings (RewriteState state)
 
static void logical_rewrite_log_mapping (RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
 
void heap_xlog_logical_rewrite (XLogReaderState *r)
 
void CheckPointLogicalRewriteHeap (void)
 

Typedef Documentation

◆ OldToNewMapping

Definition at line 185 of file rewriteheap.c.

◆ RewriteMappingDataEntry

◆ RewriteMappingFile

◆ RewriteStateData

◆ UnresolvedTup

Definition at line 177 of file rewriteheap.c.

Function Documentation

◆ begin_heap_rewrite()

RewriteState begin_heap_rewrite ( Relation  old_heap,
Relation  new_heap,
TransactionId  oldest_xmin,
TransactionId  freeze_xid,
MultiXactId  cutoff_multi 
)

Definition at line 234 of file rewriteheap.c.

236 {
238  MemoryContext rw_cxt;
239  MemoryContext old_cxt;
240  HASHCTL hash_ctl;
241 
242  /*
243  * To ease cleanup, make a separate context that will contain the
244  * RewriteState struct itself plus all subsidiary data.
245  */
247  "Table rewrite",
249  old_cxt = MemoryContextSwitchTo(rw_cxt);
250 
251  /* Create and fill in the state struct */
252  state = palloc0(sizeof(RewriteStateData));
253 
254  state->rs_old_rel = old_heap;
255  state->rs_new_rel = new_heap;
256  state->rs_buffer = NULL;
257  /* new_heap needn't be empty, just locked */
258  state->rs_blockno = RelationGetNumberOfBlocks(new_heap);
259  state->rs_oldest_xmin = oldest_xmin;
260  state->rs_freeze_xid = freeze_xid;
261  state->rs_cutoff_multi = cutoff_multi;
262  state->rs_cxt = rw_cxt;
263  state->rs_bulkstate = smgr_bulk_start_rel(new_heap, MAIN_FORKNUM);
264 
265  /* Initialize hash tables used to track update chains */
266  hash_ctl.keysize = sizeof(TidHashKey);
267  hash_ctl.entrysize = sizeof(UnresolvedTupData);
268  hash_ctl.hcxt = state->rs_cxt;
269 
270  state->rs_unresolved_tups =
271  hash_create("Rewrite / Unresolved ctids",
272  128, /* arbitrary initial size */
273  &hash_ctl,
275 
276  hash_ctl.entrysize = sizeof(OldToNewMappingData);
277 
278  state->rs_old_new_tid_map =
279  hash_create("Rewrite / Old to new tid map",
280  128, /* arbitrary initial size */
281  &hash_ctl,
283 
284  MemoryContextSwitchTo(old_cxt);
285 
287 
288  return state;
289 }
#define RelationGetNumberOfBlocks(reln)
Definition: bufmgr.h:281
BulkWriteState * smgr_bulk_start_rel(Relation rel, ForkNumber forknum)
Definition: bulk_write.c:86
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
void * palloc0(Size size)
Definition: mcxt.c:1346
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
MemoryContextSwitchTo(old_ctx)
@ MAIN_FORKNUM
Definition: relpath.h:50
static void logical_begin_heap_rewrite(RewriteState state)
Definition: rewriteheap.c:759
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
MemoryContext hcxt
Definition: hsearch.h:86
Definition: regguts.h:323

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CurrentMemoryContext, HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, logical_begin_heap_rewrite(), MAIN_FORKNUM, MemoryContextSwitchTo(), palloc0(), RelationGetNumberOfBlocks, and smgr_bulk_start_rel().

Referenced by heapam_relation_copy_for_cluster().

◆ CheckPointLogicalRewriteHeap()

void CheckPointLogicalRewriteHeap ( void  )

Definition at line 1155 of file rewriteheap.c.

1156 {
1157  XLogRecPtr cutoff;
1158  XLogRecPtr redo;
1159  DIR *mappings_dir;
1160  struct dirent *mapping_de;
1161  char path[MAXPGPATH + 20];
1162 
1163  /*
1164  * We start of with a minimum of the last redo pointer. No new decoding
1165  * slot will start before that, so that's a safe upper bound for removal.
1166  */
1167  redo = GetRedoRecPtr();
1168 
1169  /* now check for the restart ptrs from existing slots */
1171 
1172  /* don't start earlier than the restart lsn */
1173  if (cutoff != InvalidXLogRecPtr && redo < cutoff)
1174  cutoff = redo;
1175 
1176  mappings_dir = AllocateDir("pg_logical/mappings");
1177  while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
1178  {
1179  Oid dboid;
1180  Oid relid;
1181  XLogRecPtr lsn;
1182  TransactionId rewrite_xid;
1183  TransactionId create_xid;
1184  uint32 hi,
1185  lo;
1186  PGFileType de_type;
1187 
1188  if (strcmp(mapping_de->d_name, ".") == 0 ||
1189  strcmp(mapping_de->d_name, "..") == 0)
1190  continue;
1191 
1192  snprintf(path, sizeof(path), "pg_logical/mappings/%s", mapping_de->d_name);
1193  de_type = get_dirent_type(path, mapping_de, false, DEBUG1);
1194 
1195  if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
1196  continue;
1197 
1198  /* Skip over files that cannot be ours. */
1199  if (strncmp(mapping_de->d_name, "map-", 4) != 0)
1200  continue;
1201 
1202  if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
1203  &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
1204  elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
1205 
1206  lsn = ((uint64) hi) << 32 | lo;
1207 
1208  if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
1209  {
1210  elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
1211  if (unlink(path) < 0)
1212  ereport(ERROR,
1214  errmsg("could not remove file \"%s\": %m", path)));
1215  }
1216  else
1217  {
1218  /* on some operating systems fsyncing a file requires O_RDWR */
1219  int fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
1220 
1221  /*
1222  * The file cannot vanish due to concurrency since this function
1223  * is the only one removing logical mappings and only one
1224  * checkpoint can be in progress at a time.
1225  */
1226  if (fd < 0)
1227  ereport(ERROR,
1229  errmsg("could not open file \"%s\": %m", path)));
1230 
1231  /*
1232  * We could try to avoid fsyncing files that either haven't
1233  * changed or have only been created since the checkpoint's start,
1234  * but it's currently not deemed worth the effort.
1235  */
1236  pgstat_report_wait_start(WAIT_EVENT_LOGICAL_REWRITE_CHECKPOINT_SYNC);
1237  if (pg_fsync(fd) != 0)
1240  errmsg("could not fsync file \"%s\": %m", path)));
1242 
1243  if (CloseTransientFile(fd) != 0)
1244  ereport(ERROR,
1246  errmsg("could not close file \"%s\": %m", path)));
1247  }
1248  }
1249  FreeDir(mappings_dir);
1250 
1251  /* persist directory entries to disk */
1252  fsync_fname("pg_logical/mappings", true);
1253 }
unsigned int uint32
Definition: c.h:506
#define PG_BINARY
Definition: c.h:1273
uint32 TransactionId
Definition: c.h:652
int errcode_for_file_access(void)
Definition: elog.c:882
int errmsg(const char *fmt,...)
Definition: elog.c:1072
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:224
#define ereport(elevel,...)
Definition: elog.h:149
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2909
int FreeDir(DIR *dir)
Definition: fd.c:2961
int CloseTransientFile(int fd)
Definition: fd.c:2809
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:756
int data_sync_elevel(int elevel)
Definition: fd.c:3936
int pg_fsync(int fd)
Definition: fd.c:386
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2633
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2843
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
Definition: file_utils.c:525
PGFileType
Definition: file_utils.h:19
@ PGFILETYPE_REG
Definition: file_utils.h:22
@ PGFILETYPE_ERROR
Definition: file_utils.h:20
#define MAXPGPATH
#define snprintf
Definition: port.h:238
unsigned int Oid
Definition: postgres_ext.h:31
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition: slot.c:1150
Definition: dirent.c:26
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:88
static void pgstat_report_wait_end(void)
Definition: wait_event.h:104
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:6393
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References AllocateDir(), CloseTransientFile(), dirent::d_name, data_sync_elevel(), DEBUG1, elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FreeDir(), fsync_fname(), get_dirent_type(), GetRedoRecPtr(), InvalidXLogRecPtr, LOGICAL_REWRITE_FORMAT, MAXPGPATH, OpenTransientFile(), PG_BINARY, pg_fsync(), PGFILETYPE_ERROR, PGFILETYPE_REG, pgstat_report_wait_end(), pgstat_report_wait_start(), ReadDir(), ReplicationSlotsComputeLogicalRestartLSN(), and snprintf.

Referenced by CheckPointGuts().

◆ end_heap_rewrite()

void end_heap_rewrite ( RewriteState  state)

Definition at line 297 of file rewriteheap.c.

298 {
299  HASH_SEQ_STATUS seq_status;
300  UnresolvedTup unresolved;
301 
302  /*
303  * Write any remaining tuples in the UnresolvedTups table. If we have any
304  * left, they should in fact be dead, but let's err on the safe side.
305  */
306  hash_seq_init(&seq_status, state->rs_unresolved_tups);
307 
308  while ((unresolved = hash_seq_search(&seq_status)) != NULL)
309  {
310  ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
311  raw_heap_insert(state, unresolved->tuple);
312  }
313 
314  /* Write the last page, if any */
315  if (state->rs_buffer)
316  {
317  smgr_bulk_write(state->rs_bulkstate, state->rs_blockno, state->rs_buffer, true);
318  state->rs_buffer = NULL;
319  }
320 
321  smgr_bulk_finish(state->rs_bulkstate);
322 
324 
325  /* Deleting the context frees everything */
326  MemoryContextDelete(state->rs_cxt);
327 }
void smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)
Definition: bulk_write.c:271
void smgr_bulk_finish(BulkWriteState *bulkstate)
Definition: bulk_write.c:129
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1395
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1385
static void ItemPointerSetInvalid(ItemPointerData *pointer)
Definition: itemptr.h:184
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
static void raw_heap_insert(RewriteState state, HeapTuple tup)
Definition: rewriteheap.c:593
static void logical_end_heap_rewrite(RewriteState state)
Definition: rewriteheap.c:905
HeapTupleHeader t_data
Definition: htup.h:68
ItemPointerData t_ctid
Definition: htup_details.h:161

References hash_seq_init(), hash_seq_search(), ItemPointerSetInvalid(), logical_end_heap_rewrite(), MemoryContextDelete(), raw_heap_insert(), smgr_bulk_finish(), smgr_bulk_write(), HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, and UnresolvedTupData::tuple.

Referenced by heapam_relation_copy_for_cluster().

◆ heap_xlog_logical_rewrite()

void heap_xlog_logical_rewrite ( XLogReaderState r)

Definition at line 1073 of file rewriteheap.c.

1074 {
1075  char path[MAXPGPATH];
1076  int fd;
1077  xl_heap_rewrite_mapping *xlrec;
1078  uint32 len;
1079  char *data;
1080 
1081  xlrec = (xl_heap_rewrite_mapping *) XLogRecGetData(r);
1082 
1083  snprintf(path, MAXPGPATH,
1084  "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
1085  xlrec->mapped_db, xlrec->mapped_rel,
1086  LSN_FORMAT_ARGS(xlrec->start_lsn),
1087  xlrec->mapped_xid, XLogRecGetXid(r));
1088 
1089  fd = OpenTransientFile(path,
1090  O_CREAT | O_WRONLY | PG_BINARY);
1091  if (fd < 0)
1092  ereport(ERROR,
1094  errmsg("could not create file \"%s\": %m", path)));
1095 
1096  /*
1097  * Truncate all data that's not guaranteed to have been safely fsynced (by
1098  * previous record or by the last checkpoint).
1099  */
1100  pgstat_report_wait_start(WAIT_EVENT_LOGICAL_REWRITE_TRUNCATE);
1101  if (ftruncate(fd, xlrec->offset) != 0)
1102  ereport(ERROR,
1104  errmsg("could not truncate file \"%s\" to %u: %m",
1105  path, (uint32) xlrec->offset)));
1107 
1108  data = XLogRecGetData(r) + sizeof(*xlrec);
1109 
1110  len = xlrec->num_mappings * sizeof(LogicalRewriteMappingData);
1111 
1112  /* write out tail end of mapping file (again) */
1113  errno = 0;
1114  pgstat_report_wait_start(WAIT_EVENT_LOGICAL_REWRITE_MAPPING_WRITE);
1115  if (pg_pwrite(fd, data, len, xlrec->offset) != len)
1116  {
1117  /* if write didn't set errno, assume problem is no disk space */
1118  if (errno == 0)
1119  errno = ENOSPC;
1120  ereport(ERROR,
1122  errmsg("could not write to file \"%s\": %m", path)));
1123  }
1125 
1126  /*
1127  * Now fsync all previously written data. We could improve things and only
1128  * do this for the last write to a file, but the required bookkeeping
1129  * doesn't seem worth the trouble.
1130  */
1131  pgstat_report_wait_start(WAIT_EVENT_LOGICAL_REWRITE_MAPPING_SYNC);
1132  if (pg_fsync(fd) != 0)
1135  errmsg("could not fsync file \"%s\": %m", path)));
1137 
1138  if (CloseTransientFile(fd) != 0)
1139  ereport(ERROR,
1141  errmsg("could not close file \"%s\": %m", path)));
1142 }
const void size_t len
const void * data
#define pg_pwrite
Definition: port.h:226
struct LogicalRewriteMappingData LogicalRewriteMappingData
TransactionId mapped_xid
Definition: heapam_xlog.h:470
#define ftruncate(a, b)
Definition: win32_port.h:82
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecGetData(decoder)
Definition: xlogreader.h:415
#define XLogRecGetXid(decoder)
Definition: xlogreader.h:412

References CloseTransientFile(), data, data_sync_elevel(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ftruncate, len, LOGICAL_REWRITE_FORMAT, LSN_FORMAT_ARGS, xl_heap_rewrite_mapping::mapped_db, xl_heap_rewrite_mapping::mapped_rel, xl_heap_rewrite_mapping::mapped_xid, MAXPGPATH, xl_heap_rewrite_mapping::num_mappings, xl_heap_rewrite_mapping::offset, OpenTransientFile(), PG_BINARY, pg_fsync(), pg_pwrite, pgstat_report_wait_end(), pgstat_report_wait_start(), snprintf, xl_heap_rewrite_mapping::start_lsn, XLogRecGetData, and XLogRecGetXid.

Referenced by heap2_redo().

◆ logical_begin_heap_rewrite()

static void logical_begin_heap_rewrite ( RewriteState  state)
static

Definition at line 759 of file rewriteheap.c.

760 {
761  HASHCTL hash_ctl;
762  TransactionId logical_xmin;
763 
764  /*
765  * We only need to persist these mappings if the rewritten table can be
766  * accessed during logical decoding, if not, we can skip doing any
767  * additional work.
768  */
769  state->rs_logical_rewrite =
771 
772  if (!state->rs_logical_rewrite)
773  return;
774 
775  ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin);
776 
777  /*
778  * If there are no logical slots in progress we don't need to do anything,
779  * there cannot be any remappings for relevant rows yet. The relation's
780  * lock protects us against races.
781  */
782  if (logical_xmin == InvalidTransactionId)
783  {
784  state->rs_logical_rewrite = false;
785  return;
786  }
787 
788  state->rs_logical_xmin = logical_xmin;
789  state->rs_begin_lsn = GetXLogInsertRecPtr();
790  state->rs_num_rewrite_mappings = 0;
791 
792  hash_ctl.keysize = sizeof(TransactionId);
793  hash_ctl.entrysize = sizeof(RewriteMappingFile);
794  hash_ctl.hcxt = state->rs_cxt;
795 
796  state->rs_logical_mappings =
797  hash_create("Logical rewrite mapping",
798  128, /* arbitrary initial size */
799  &hash_ctl,
801 }
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:3952
#define RelationIsAccessibleInLogicalDecoding(relation)
Definition: rel.h:684
struct RewriteMappingFile RewriteMappingFile
#define InvalidTransactionId
Definition: transam.h:31
XLogRecPtr GetXLogInsertRecPtr(void)
Definition: xlog.c:9355

References HASHCTL::entrysize, GetXLogInsertRecPtr(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, HASHCTL::keysize, ProcArrayGetReplicationSlotXmin(), and RelationIsAccessibleInLogicalDecoding.

Referenced by begin_heap_rewrite().

◆ logical_end_heap_rewrite()

static void logical_end_heap_rewrite ( RewriteState  state)
static

Definition at line 905 of file rewriteheap.c.

906 {
907  HASH_SEQ_STATUS seq_status;
908  RewriteMappingFile *src;
909 
910  /* done, no logical rewrite in progress */
911  if (!state->rs_logical_rewrite)
912  return;
913 
914  /* writeout remaining in-memory entries */
915  if (state->rs_num_rewrite_mappings > 0)
917 
918  /* Iterate over all mappings we have written and fsync the files. */
919  hash_seq_init(&seq_status, state->rs_logical_mappings);
920  while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
921  {
922  if (FileSync(src->vfd, WAIT_EVENT_LOGICAL_REWRITE_SYNC) != 0)
925  errmsg("could not fsync file \"%s\": %m", src->path)));
926  FileClose(src->vfd);
927  }
928  /* memory context cleanup will deal with the rest */
929 }
int FileSync(File file, uint32 wait_event_info)
Definition: fd.c:2297
void FileClose(File file)
Definition: fd.c:1978
static void logical_heap_rewrite_flush_mappings(RewriteState state)
Definition: rewriteheap.c:807
char path[MAXPGPATH]
Definition: rewriteheap.c:197

References data_sync_elevel(), ereport, errcode_for_file_access(), errmsg(), ERROR, FileClose(), FileSync(), hash_seq_init(), hash_seq_search(), logical_heap_rewrite_flush_mappings(), RewriteMappingFile::path, and RewriteMappingFile::vfd.

Referenced by end_heap_rewrite().

◆ logical_heap_rewrite_flush_mappings()

static void logical_heap_rewrite_flush_mappings ( RewriteState  state)
static

Definition at line 807 of file rewriteheap.c.

808 {
809  HASH_SEQ_STATUS seq_status;
810  RewriteMappingFile *src;
811  dlist_mutable_iter iter;
812 
813  Assert(state->rs_logical_rewrite);
814 
815  /* no logical rewrite in progress, no need to iterate over mappings */
816  if (state->rs_num_rewrite_mappings == 0)
817  return;
818 
819  elog(DEBUG1, "flushing %u logical rewrite mapping entries",
820  state->rs_num_rewrite_mappings);
821 
822  hash_seq_init(&seq_status, state->rs_logical_mappings);
823  while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
824  {
825  char *waldata;
826  char *waldata_start;
828  Oid dboid;
829  uint32 len;
830  int written;
831  uint32 num_mappings = dclist_count(&src->mappings);
832 
833  /* this file hasn't got any new mappings */
834  if (num_mappings == 0)
835  continue;
836 
837  if (state->rs_old_rel->rd_rel->relisshared)
838  dboid = InvalidOid;
839  else
840  dboid = MyDatabaseId;
841 
842  xlrec.num_mappings = num_mappings;
843  xlrec.mapped_rel = RelationGetRelid(state->rs_old_rel);
844  xlrec.mapped_xid = src->xid;
845  xlrec.mapped_db = dboid;
846  xlrec.offset = src->off;
847  xlrec.start_lsn = state->rs_begin_lsn;
848 
849  /* write all mappings consecutively */
850  len = num_mappings * sizeof(LogicalRewriteMappingData);
851  waldata_start = waldata = palloc(len);
852 
853  /*
854  * collect data we need to write out, but don't modify ondisk data yet
855  */
856  dclist_foreach_modify(iter, &src->mappings)
857  {
859 
860  pmap = dclist_container(RewriteMappingDataEntry, node, iter.cur);
861 
862  memcpy(waldata, &pmap->map, sizeof(pmap->map));
863  waldata += sizeof(pmap->map);
864 
865  /* remove from the list and free */
866  dclist_delete_from(&src->mappings, &pmap->node);
867  pfree(pmap);
868 
869  /* update bookkeeping */
870  state->rs_num_rewrite_mappings--;
871  }
872 
873  Assert(dclist_count(&src->mappings) == 0);
874  Assert(waldata == waldata_start + len);
875 
876  /*
877  * Note that we deviate from the usual WAL coding practices here,
878  * check the above "Logical rewrite support" comment for reasoning.
879  */
880  written = FileWrite(src->vfd, waldata_start, len, src->off,
881  WAIT_EVENT_LOGICAL_REWRITE_WRITE);
882  if (written != len)
883  ereport(ERROR,
885  errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path,
886  written, len)));
887  src->off += len;
888 
889  XLogBeginInsert();
890  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
891  XLogRegisterData(waldata_start, len);
892 
893  /* write xlog record */
894  XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_REWRITE);
895 
896  pfree(waldata_start);
897  }
898  Assert(state->rs_num_rewrite_mappings == 0);
899 }
#define Assert(condition)
Definition: c.h:858
static ssize_t FileWrite(File file, const void *buffer, size_t amount, off_t offset, uint32 wait_event_info)
Definition: fd.h:208
Oid MyDatabaseId
Definition: globals.c:91
#define XLOG_HEAP2_REWRITE
Definition: heapam_xlog.h:58
#define dclist_container(type, membername, ptr)
Definition: ilist.h:947
static uint32 dclist_count(const dclist_head *head)
Definition: ilist.h:932
static void dclist_delete_from(dclist_head *head, dlist_node *node)
Definition: ilist.h:763
#define dclist_foreach_modify(iter, lhead)
Definition: ilist.h:973
void pfree(void *pointer)
Definition: mcxt.c:1520
void * palloc(Size size)
Definition: mcxt.c:1316
#define InvalidOid
Definition: postgres_ext.h:36
#define RelationGetRelid(relation)
Definition: rel.h:505
LogicalRewriteMappingData map
Definition: rewriteheap.c:206
TransactionId xid
Definition: rewriteheap.c:193
dclist_head mappings
Definition: rewriteheap.c:196
dlist_node * cur
Definition: ilist.h:200
void XLogRegisterData(char *data, uint32 len)
Definition: xloginsert.c:364
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:474
void XLogBeginInsert(void)
Definition: xloginsert.c:149

References Assert, dlist_mutable_iter::cur, dclist_container, dclist_count(), dclist_delete_from(), dclist_foreach_modify, DEBUG1, elog, ereport, errcode_for_file_access(), errmsg(), ERROR, FileWrite(), hash_seq_init(), hash_seq_search(), InvalidOid, len, RewriteMappingDataEntry::map, xl_heap_rewrite_mapping::mapped_db, xl_heap_rewrite_mapping::mapped_rel, xl_heap_rewrite_mapping::mapped_xid, RewriteMappingFile::mappings, MyDatabaseId, RewriteMappingDataEntry::node, xl_heap_rewrite_mapping::num_mappings, RewriteMappingFile::off, xl_heap_rewrite_mapping::offset, palloc(), RewriteMappingFile::path, pfree(), RelationGetRelid, xl_heap_rewrite_mapping::start_lsn, RewriteMappingFile::vfd, RewriteMappingFile::xid, XLOG_HEAP2_REWRITE, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by logical_end_heap_rewrite(), and logical_rewrite_log_mapping().

◆ logical_rewrite_heap_tuple()

static void logical_rewrite_heap_tuple ( RewriteState  state,
ItemPointerData  old_tid,
HeapTuple  new_tuple 
)
static

Definition at line 999 of file rewriteheap.c.

1001 {
1002  ItemPointerData new_tid = new_tuple->t_self;
1003  TransactionId cutoff = state->rs_logical_xmin;
1004  TransactionId xmin;
1005  TransactionId xmax;
1006  bool do_log_xmin = false;
1007  bool do_log_xmax = false;
1009 
1010  /* no logical rewrite in progress, we don't need to log anything */
1011  if (!state->rs_logical_rewrite)
1012  return;
1013 
1014  xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
1015  /* use *GetUpdateXid to correctly deal with multixacts */
1016  xmax = HeapTupleHeaderGetUpdateXid(new_tuple->t_data);
1017 
1018  /*
1019  * Log the mapping iff the tuple has been created recently.
1020  */
1021  if (TransactionIdIsNormal(xmin) && !TransactionIdPrecedes(xmin, cutoff))
1022  do_log_xmin = true;
1023 
1024  if (!TransactionIdIsNormal(xmax))
1025  {
1026  /*
1027  * no xmax is set, can't have any permanent ones, so this check is
1028  * sufficient
1029  */
1030  }
1031  else if (HEAP_XMAX_IS_LOCKED_ONLY(new_tuple->t_data->t_infomask))
1032  {
1033  /* only locked, we don't care */
1034  }
1035  else if (!TransactionIdPrecedes(xmax, cutoff))
1036  {
1037  /* tuple has been deleted recently, log */
1038  do_log_xmax = true;
1039  }
1040 
1041  /* if neither needs to be logged, we're done */
1042  if (!do_log_xmin && !do_log_xmax)
1043  return;
1044 
1045  /* fill out mapping information */
1046  map.old_locator = state->rs_old_rel->rd_locator;
1047  map.old_tid = old_tid;
1048  map.new_locator = state->rs_new_rel->rd_locator;
1049  map.new_tid = new_tid;
1050 
1051  /* ---
1052  * Now persist the mapping for the individual xids that are affected. We
1053  * need to log for both xmin and xmax if they aren't the same transaction
1054  * since the mapping files are per "affected" xid.
1055  * We don't muster all that much effort detecting whether xmin and xmax
1056  * are actually the same transaction, we just check whether the xid is the
1057  * same disregarding subtransactions. Logging too much is relatively
1058  * harmless and we could never do the check fully since subtransaction
1059  * data is thrown away during restarts.
1060  * ---
1061  */
1062  if (do_log_xmin)
1063  logical_rewrite_log_mapping(state, xmin, &map);
1064  /* separately log mapping for xmax unless it'd be redundant */
1065  if (do_log_xmax && !TransactionIdEquals(xmin, xmax))
1066  logical_rewrite_log_mapping(state, xmax, &map);
1067 }
#define HEAP_XMAX_IS_LOCKED_ONLY(infomask)
Definition: htup_details.h:227
#define HeapTupleHeaderGetXmin(tup)
Definition: htup_details.h:309
#define HeapTupleHeaderGetUpdateXid(tup)
Definition: htup_details.h:361
static void logical_rewrite_log_mapping(RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
Definition: rewriteheap.c:935
ItemPointerData t_self
Definition: htup.h:65
ItemPointerData new_tid
Definition: rewriteheap.h:40
RelFileLocator old_locator
Definition: rewriteheap.h:37
ItemPointerData old_tid
Definition: rewriteheap.h:39
RelFileLocator new_locator
Definition: rewriteheap.h:38
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
#define TransactionIdIsNormal(xid)
Definition: transam.h:42

References HEAP_XMAX_IS_LOCKED_ONLY, HeapTupleHeaderGetUpdateXid, HeapTupleHeaderGetXmin, logical_rewrite_log_mapping(), LogicalRewriteMappingData::new_locator, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_locator, LogicalRewriteMappingData::old_tid, HeapTupleData::t_data, HeapTupleHeaderData::t_infomask, HeapTupleData::t_self, TransactionIdEquals, TransactionIdIsNormal, and TransactionIdPrecedes().

Referenced by rewrite_heap_tuple().

◆ logical_rewrite_log_mapping()

static void logical_rewrite_log_mapping ( RewriteState  state,
TransactionId  xid,
LogicalRewriteMappingData map 
)
static

Definition at line 935 of file rewriteheap.c.

937 {
938  RewriteMappingFile *src;
940  Oid relid;
941  bool found;
942 
943  relid = RelationGetRelid(state->rs_old_rel);
944 
945  /* look for existing mappings for this 'mapped' xid */
946  src = hash_search(state->rs_logical_mappings, &xid,
947  HASH_ENTER, &found);
948 
949  /*
950  * We haven't yet had the need to map anything for this xid, create
951  * per-xid data structures.
952  */
953  if (!found)
954  {
955  char path[MAXPGPATH];
956  Oid dboid;
957 
958  if (state->rs_old_rel->rd_rel->relisshared)
959  dboid = InvalidOid;
960  else
961  dboid = MyDatabaseId;
962 
963  snprintf(path, MAXPGPATH,
964  "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
965  dboid, relid,
966  LSN_FORMAT_ARGS(state->rs_begin_lsn),
967  xid, GetCurrentTransactionId());
968 
969  dclist_init(&src->mappings);
970  src->off = 0;
971  memcpy(src->path, path, sizeof(path));
972  src->vfd = PathNameOpenFile(path,
973  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
974  if (src->vfd < 0)
975  ereport(ERROR,
977  errmsg("could not create file \"%s\": %m", path)));
978  }
979 
980  pmap = MemoryContextAlloc(state->rs_cxt,
981  sizeof(RewriteMappingDataEntry));
982  memcpy(&pmap->map, map, sizeof(LogicalRewriteMappingData));
983  dclist_push_tail(&src->mappings, &pmap->node);
984  state->rs_num_rewrite_mappings++;
985 
986  /*
987  * Write out buffer every time we've too many in-memory entries across all
988  * mapping files.
989  */
990  if (state->rs_num_rewrite_mappings >= 1000 /* arbitrary number */ )
992 }
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1575
@ HASH_ENTER
Definition: hsearch.h:114
static void dclist_push_tail(dclist_head *head, dlist_node *node)
Definition: ilist.h:709
static void dclist_init(dclist_head *head)
Definition: ilist.h:671
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1180
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:451

References dclist_init(), dclist_push_tail(), ereport, errcode_for_file_access(), errmsg(), ERROR, GetCurrentTransactionId(), HASH_ENTER, hash_search(), InvalidOid, logical_heap_rewrite_flush_mappings(), LOGICAL_REWRITE_FORMAT, LSN_FORMAT_ARGS, RewriteMappingDataEntry::map, RewriteMappingFile::mappings, MAXPGPATH, MemoryContextAlloc(), MyDatabaseId, RewriteMappingDataEntry::node, RewriteMappingFile::off, RewriteMappingFile::path, PathNameOpenFile(), PG_BINARY, RelationGetRelid, snprintf, and RewriteMappingFile::vfd.

Referenced by logical_rewrite_heap_tuple().

◆ raw_heap_insert()

static void raw_heap_insert ( RewriteState  state,
HeapTuple  tup 
)
static

Definition at line 593 of file rewriteheap.c.

594 {
595  Page page;
596  Size pageFreeSpace,
597  saveFreeSpace;
598  Size len;
599  OffsetNumber newoff;
600  HeapTuple heaptup;
601 
602  /*
603  * If the new tuple is too big for storage or contains already toasted
604  * out-of-line attributes from some other relation, invoke the toaster.
605  *
606  * Note: below this point, heaptup is the data we actually intend to store
607  * into the relation; tup is the caller's original untoasted data.
608  */
609  if (state->rs_new_rel->rd_rel->relkind == RELKIND_TOASTVALUE)
610  {
611  /* toast table entries should never be recursively toasted */
613  heaptup = tup;
614  }
615  else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
616  {
618 
619  /*
620  * While rewriting the heap for VACUUM FULL / CLUSTER, make sure data
621  * for the TOAST table are not logically decoded. The main heap is
622  * WAL-logged as XLOG FPI records, which are not logically decoded.
623  */
625 
626  heaptup = heap_toast_insert_or_update(state->rs_new_rel, tup, NULL,
627  options);
628  }
629  else
630  heaptup = tup;
631 
632  len = MAXALIGN(heaptup->t_len); /* be conservative */
633 
634  /*
635  * If we're gonna fail for oversize tuple, do it right away
636  */
637  if (len > MaxHeapTupleSize)
638  ereport(ERROR,
639  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
640  errmsg("row is too big: size %zu, maximum size %zu",
641  len, MaxHeapTupleSize)));
642 
643  /* Compute desired extra freespace due to fillfactor option */
644  saveFreeSpace = RelationGetTargetPageFreeSpace(state->rs_new_rel,
646 
647  /* Now we can check to see if there's enough free space already. */
648  page = (Page) state->rs_buffer;
649  if (page)
650  {
651  pageFreeSpace = PageGetHeapFreeSpace(page);
652 
653  if (len + saveFreeSpace > pageFreeSpace)
654  {
655  /*
656  * Doesn't fit, so write out the existing page. It always
657  * contains a tuple. Hence, unlike RelationGetBufferForTuple(),
658  * enforce saveFreeSpace unconditionally.
659  */
660  smgr_bulk_write(state->rs_bulkstate, state->rs_blockno, state->rs_buffer, true);
661  state->rs_buffer = NULL;
662  page = NULL;
663  state->rs_blockno++;
664  }
665  }
666 
667  if (!page)
668  {
669  /* Initialize a new empty page */
670  state->rs_buffer = smgr_bulk_get_buf(state->rs_bulkstate);
671  page = (Page) state->rs_buffer;
672  PageInit(page, BLCKSZ, 0);
673  }
674 
675  /* And now we can insert the tuple into the page */
676  newoff = PageAddItem(page, (Item) heaptup->t_data, heaptup->t_len,
677  InvalidOffsetNumber, false, true);
678  if (newoff == InvalidOffsetNumber)
679  elog(ERROR, "failed to add tuple");
680 
681  /* Update caller's t_self to the actual position where it was stored */
682  ItemPointerSet(&(tup->t_self), state->rs_blockno, newoff);
683 
684  /*
685  * Insert the correct position into CTID of the stored tuple, too, if the
686  * caller didn't supply a valid CTID.
687  */
688  if (!ItemPointerIsValid(&tup->t_data->t_ctid))
689  {
690  ItemId newitemid;
691  HeapTupleHeader onpage_tup;
692 
693  newitemid = PageGetItemId(page, newoff);
694  onpage_tup = (HeapTupleHeader) PageGetItem(page, newitemid);
695 
696  onpage_tup->t_ctid = tup->t_self;
697  }
698 
699  /* If heaptup is a private copy, release it. */
700  if (heaptup != tup)
701  heap_freetuple(heaptup);
702 }
Size PageGetHeapFreeSpace(Page page)
Definition: bufpage.c:991
void PageInit(Page page, Size pageSize, Size specialSize)
Definition: bufpage.c:42
Pointer Page
Definition: bufpage.h:78
static Item PageGetItem(Page page, ItemId itemId)
Definition: bufpage.h:351
static ItemId PageGetItemId(Page page, OffsetNumber offsetNumber)
Definition: bufpage.h:240
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:468
BulkWriteBuffer smgr_bulk_get_buf(BulkWriteState *bulkstate)
Definition: bulk_write.c:295
#define MAXALIGN(LEN)
Definition: c.h:811
size_t Size
Definition: c.h:605
int errcode(int sqlerrcode)
Definition: elog.c:859
#define HEAP_INSERT_SKIP_FSM
Definition: heapam.h:35
#define HEAP_INSERT_NO_LOGICAL
Definition: heapam.h:37
HeapTuple heap_toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, int options)
Definition: heaptoast.c:96
#define TOAST_TUPLE_THRESHOLD
Definition: heaptoast.h:48
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1434
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
#define HeapTupleHasExternal(tuple)
Definition: htup_details.h:671
#define MaxHeapTupleSize
Definition: htup_details.h:558
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
Pointer Item
Definition: item.h:17
static void ItemPointerSet(ItemPointerData *pointer, BlockNumber blockNumber, OffsetNumber offNum)
Definition: itemptr.h:135
static bool ItemPointerIsValid(const ItemPointerData *pointer)
Definition: itemptr.h:83
#define InvalidOffsetNumber
Definition: off.h:26
uint16 OffsetNumber
Definition: off.h:24
#define RelationGetTargetPageFreeSpace(relation, defaultff)
Definition: rel.h:378
#define HEAP_DEFAULT_FILLFACTOR
Definition: rel.h:349
uint32 t_len
Definition: htup.h:64

References Assert, elog, ereport, errcode(), errmsg(), ERROR, HEAP_DEFAULT_FILLFACTOR, heap_freetuple(), HEAP_INSERT_NO_LOGICAL, HEAP_INSERT_SKIP_FSM, heap_toast_insert_or_update(), HeapTupleHasExternal, if(), InvalidOffsetNumber, ItemPointerIsValid(), ItemPointerSet(), len, MAXALIGN, MaxHeapTupleSize, PageAddItem, PageGetHeapFreeSpace(), PageGetItem(), PageGetItemId(), PageInit(), RelationGetTargetPageFreeSpace, smgr_bulk_get_buf(), smgr_bulk_write(), HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, HeapTupleData::t_len, HeapTupleData::t_self, and TOAST_TUPLE_THRESHOLD.

Referenced by end_heap_rewrite(), and rewrite_heap_tuple().

◆ rewrite_heap_dead_tuple()

bool rewrite_heap_dead_tuple ( RewriteState  state,
HeapTuple  old_tuple 
)

Definition at line 543 of file rewriteheap.c.

544 {
545  /*
546  * If we have already seen an earlier tuple in the update chain that
547  * points to this tuple, let's forget about that earlier tuple. It's in
548  * fact dead as well, our simple xmax < OldestXmin test in
549  * HeapTupleSatisfiesVacuum just wasn't enough to detect it. It happens
550  * when xmin of a tuple is greater than xmax, which sounds
551  * counter-intuitive but is perfectly valid.
552  *
553  * We don't bother to try to detect the situation the other way round,
554  * when we encounter the dead tuple first and then the recently dead one
555  * that points to it. If that happens, we'll have some unmatched entries
556  * in the UnresolvedTups hash table at the end. That can happen anyway,
557  * because a vacuum might have removed the dead tuple in the chain before
558  * us.
559  */
560  UnresolvedTup unresolved;
561  TidHashKey hashkey;
562  bool found;
563 
564  memset(&hashkey, 0, sizeof(hashkey));
565  hashkey.xmin = HeapTupleHeaderGetXmin(old_tuple->t_data);
566  hashkey.tid = old_tuple->t_self;
567 
568  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
569  HASH_FIND, NULL);
570 
571  if (unresolved != NULL)
572  {
573  /* Need to free the contained tuple as well as the hashtable entry */
574  heap_freetuple(unresolved->tuple);
575  hash_search(state->rs_unresolved_tups, &hashkey,
576  HASH_REMOVE, &found);
577  Assert(found);
578  return true;
579  }
580 
581  return false;
582 }
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_REMOVE
Definition: hsearch.h:115
TransactionId xmin
Definition: rewriteheap.c:163
ItemPointerData tid
Definition: rewriteheap.c:164

References Assert, HASH_FIND, HASH_REMOVE, hash_search(), heap_freetuple(), HeapTupleHeaderGetXmin, HeapTupleData::t_data, HeapTupleData::t_self, TidHashKey::tid, UnresolvedTupData::tuple, and TidHashKey::xmin.

Referenced by heapam_relation_copy_for_cluster().

◆ rewrite_heap_tuple()

void rewrite_heap_tuple ( RewriteState  state,
HeapTuple  old_tuple,
HeapTuple  new_tuple 
)

Definition at line 341 of file rewriteheap.c.

343 {
344  MemoryContext old_cxt;
345  ItemPointerData old_tid;
346  TidHashKey hashkey;
347  bool found;
348  bool free_new;
349 
350  old_cxt = MemoryContextSwitchTo(state->rs_cxt);
351 
352  /*
353  * Copy the original tuple's visibility information into new_tuple.
354  *
355  * XXX we might later need to copy some t_infomask2 bits, too? Right now,
356  * we intentionally clear the HOT status bits.
357  */
358  memcpy(&new_tuple->t_data->t_choice.t_heap,
359  &old_tuple->t_data->t_choice.t_heap,
360  sizeof(HeapTupleFields));
361 
362  new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
363  new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
364  new_tuple->t_data->t_infomask |=
365  old_tuple->t_data->t_infomask & HEAP_XACT_MASK;
366 
367  /*
368  * While we have our hands on the tuple, we may as well freeze any
369  * eligible xmin or xmax, so that future VACUUM effort can be saved.
370  */
371  heap_freeze_tuple(new_tuple->t_data,
372  state->rs_old_rel->rd_rel->relfrozenxid,
373  state->rs_old_rel->rd_rel->relminmxid,
374  state->rs_freeze_xid,
375  state->rs_cutoff_multi);
376 
377  /*
378  * Invalid ctid means that ctid should point to the tuple itself. We'll
379  * override it later if the tuple is part of an update chain.
380  */
381  ItemPointerSetInvalid(&new_tuple->t_data->t_ctid);
382 
383  /*
384  * If the tuple has been updated, check the old-to-new mapping hash table.
385  */
386  if (!((old_tuple->t_data->t_infomask & HEAP_XMAX_INVALID) ||
387  HeapTupleHeaderIsOnlyLocked(old_tuple->t_data)) &&
389  !(ItemPointerEquals(&(old_tuple->t_self),
390  &(old_tuple->t_data->t_ctid))))
391  {
392  OldToNewMapping mapping;
393 
394  memset(&hashkey, 0, sizeof(hashkey));
395  hashkey.xmin = HeapTupleHeaderGetUpdateXid(old_tuple->t_data);
396  hashkey.tid = old_tuple->t_data->t_ctid;
397 
398  mapping = (OldToNewMapping)
399  hash_search(state->rs_old_new_tid_map, &hashkey,
400  HASH_FIND, NULL);
401 
402  if (mapping != NULL)
403  {
404  /*
405  * We've already copied the tuple that t_ctid points to, so we can
406  * set the ctid of this tuple to point to the new location, and
407  * insert it right away.
408  */
409  new_tuple->t_data->t_ctid = mapping->new_tid;
410 
411  /* We don't need the mapping entry anymore */
412  hash_search(state->rs_old_new_tid_map, &hashkey,
413  HASH_REMOVE, &found);
414  Assert(found);
415  }
416  else
417  {
418  /*
419  * We haven't seen the tuple t_ctid points to yet. Stash this
420  * tuple into unresolved_tups to be written later.
421  */
422  UnresolvedTup unresolved;
423 
424  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
425  HASH_ENTER, &found);
426  Assert(!found);
427 
428  unresolved->old_tid = old_tuple->t_self;
429  unresolved->tuple = heap_copytuple(new_tuple);
430 
431  /*
432  * We can't do anything more now, since we don't know where the
433  * tuple will be written.
434  */
435  MemoryContextSwitchTo(old_cxt);
436  return;
437  }
438  }
439 
440  /*
441  * Now we will write the tuple, and then check to see if it is the B tuple
442  * in any new or known pair. When we resolve a known pair, we will be
443  * able to write that pair's A tuple, and then we have to check if it
444  * resolves some other pair. Hence, we need a loop here.
445  */
446  old_tid = old_tuple->t_self;
447  free_new = false;
448 
449  for (;;)
450  {
451  ItemPointerData new_tid;
452 
453  /* Insert the tuple and find out where it's put in new_heap */
454  raw_heap_insert(state, new_tuple);
455  new_tid = new_tuple->t_self;
456 
457  logical_rewrite_heap_tuple(state, old_tid, new_tuple);
458 
459  /*
460  * If the tuple is the updated version of a row, and the prior version
461  * wouldn't be DEAD yet, then we need to either resolve the prior
462  * version (if it's waiting in rs_unresolved_tups), or make an entry
463  * in rs_old_new_tid_map (so we can resolve it when we do see it). The
464  * previous tuple's xmax would equal this one's xmin, so it's
465  * RECENTLY_DEAD if and only if the xmin is not before OldestXmin.
466  */
467  if ((new_tuple->t_data->t_infomask & HEAP_UPDATED) &&
469  state->rs_oldest_xmin))
470  {
471  /*
472  * Okay, this is B in an update pair. See if we've seen A.
473  */
474  UnresolvedTup unresolved;
475 
476  memset(&hashkey, 0, sizeof(hashkey));
477  hashkey.xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
478  hashkey.tid = old_tid;
479 
480  unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
481  HASH_FIND, NULL);
482 
483  if (unresolved != NULL)
484  {
485  /*
486  * We have seen and memorized the previous tuple already. Now
487  * that we know where we inserted the tuple its t_ctid points
488  * to, fix its t_ctid and insert it to the new heap.
489  */
490  if (free_new)
491  heap_freetuple(new_tuple);
492  new_tuple = unresolved->tuple;
493  free_new = true;
494  old_tid = unresolved->old_tid;
495  new_tuple->t_data->t_ctid = new_tid;
496 
497  /*
498  * We don't need the hash entry anymore, but don't free its
499  * tuple just yet.
500  */
501  hash_search(state->rs_unresolved_tups, &hashkey,
502  HASH_REMOVE, &found);
503  Assert(found);
504 
505  /* loop back to insert the previous tuple in the chain */
506  continue;
507  }
508  else
509  {
510  /*
511  * Remember the new tid of this tuple. We'll use it to set the
512  * ctid when we find the previous tuple in the chain.
513  */
514  OldToNewMapping mapping;
515 
516  mapping = hash_search(state->rs_old_new_tid_map, &hashkey,
517  HASH_ENTER, &found);
518  Assert(!found);
519 
520  mapping->new_tid = new_tid;
521  }
522  }
523 
524  /* Done with this (chain of) tuples, for now */
525  if (free_new)
526  heap_freetuple(new_tuple);
527  break;
528  }
529 
530  MemoryContextSwitchTo(old_cxt);
531 }
bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId relfrozenxid, TransactionId relminmxid, TransactionId FreezeLimit, TransactionId MultiXactCutoff)
Definition: heapam.c:6913
bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple)
HeapTuple heap_copytuple(HeapTuple tuple)
Definition: heaptuple.c:776
#define HeapTupleHeaderIndicatesMovedPartitions(tup)
Definition: htup_details.h:444
#define HEAP2_XACT_MASK
Definition: htup_details.h:279
#define HEAP_XACT_MASK
Definition: htup_details.h:215
#define HEAP_XMAX_INVALID
Definition: htup_details.h:208
#define HEAP_UPDATED
Definition: htup_details.h:210
bool ItemPointerEquals(ItemPointer pointer1, ItemPointer pointer2)
Definition: itemptr.c:35
static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
Definition: rewriteheap.c:999
OldToNewMappingData * OldToNewMapping
Definition: rewriteheap.c:185
HeapTupleFields t_heap
Definition: htup_details.h:157
union HeapTupleHeaderData::@48 t_choice
ItemPointerData new_tid
Definition: rewriteheap.c:182
ItemPointerData old_tid
Definition: rewriteheap.c:173

References Assert, HASH_ENTER, HASH_FIND, HASH_REMOVE, hash_search(), HEAP2_XACT_MASK, heap_copytuple(), heap_freetuple(), heap_freeze_tuple(), HEAP_UPDATED, HEAP_XACT_MASK, HEAP_XMAX_INVALID, HeapTupleHeaderGetUpdateXid, HeapTupleHeaderGetXmin, HeapTupleHeaderIndicatesMovedPartitions, HeapTupleHeaderIsOnlyLocked(), ItemPointerEquals(), ItemPointerSetInvalid(), logical_rewrite_heap_tuple(), MemoryContextSwitchTo(), OldToNewMappingData::new_tid, UnresolvedTupData::old_tid, raw_heap_insert(), HeapTupleHeaderData::t_choice, HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, HeapTupleHeaderData::t_heap, HeapTupleHeaderData::t_infomask, HeapTupleHeaderData::t_infomask2, HeapTupleData::t_self, TidHashKey::tid, TransactionIdPrecedes(), UnresolvedTupData::tuple, and TidHashKey::xmin.

Referenced by reform_and_rewrite_tuple().