PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
rewriteheap.c File Reference
#include "postgres.h"
#include <unistd.h>
#include "access/heapam.h"
#include "access/heapam_xlog.h"
#include "access/heaptoast.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xloginsert.h"
#include "common/file_utils.h"
#include "lib/ilist.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/slot.h"
#include "storage/bufmgr.h"
#include "storage/bulk_write.h"
#include "storage/fd.h"
#include "storage/procarray.h"
#include "utils/memutils.h"
#include "utils/rel.h"
Include dependency graph for rewriteheap.c:

Go to the source code of this file.

Data Structures

struct  RewriteStateData
 
struct  TidHashKey
 
struct  UnresolvedTupData
 
struct  OldToNewMappingData
 
struct  RewriteMappingFile
 
struct  RewriteMappingDataEntry
 

Typedefs

typedef struct RewriteStateData RewriteStateData
 
typedef UnresolvedTupDataUnresolvedTup
 
typedef OldToNewMappingDataOldToNewMapping
 
typedef struct RewriteMappingFile RewriteMappingFile
 
typedef struct RewriteMappingDataEntry RewriteMappingDataEntry
 

Functions

static void raw_heap_insert (RewriteState state, HeapTuple tup)
 
static void logical_begin_heap_rewrite (RewriteState state)
 
static void logical_rewrite_heap_tuple (RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
 
static void logical_end_heap_rewrite (RewriteState state)
 
RewriteState begin_heap_rewrite (Relation old_heap, Relation new_heap, TransactionId oldest_xmin, TransactionId freeze_xid, MultiXactId cutoff_multi)
 
void end_heap_rewrite (RewriteState state)
 
void rewrite_heap_tuple (RewriteState state, HeapTuple old_tuple, HeapTuple new_tuple)
 
bool rewrite_heap_dead_tuple (RewriteState state, HeapTuple old_tuple)
 
static void logical_heap_rewrite_flush_mappings (RewriteState state)
 
static void logical_rewrite_log_mapping (RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
 
void heap_xlog_logical_rewrite (XLogReaderState *r)
 
void CheckPointLogicalRewriteHeap (void)
 

Typedef Documentation

◆ OldToNewMapping

Definition at line 185 of file rewriteheap.c.

◆ RewriteMappingDataEntry

◆ RewriteMappingFile

◆ RewriteStateData

◆ UnresolvedTup

Definition at line 177 of file rewriteheap.c.

Function Documentation

◆ begin_heap_rewrite()

RewriteState begin_heap_rewrite ( Relation  old_heap,
Relation  new_heap,
TransactionId  oldest_xmin,
TransactionId  freeze_xid,
MultiXactId  cutoff_multi 
)

Definition at line 234 of file rewriteheap.c.

236{
238 MemoryContext rw_cxt;
239 MemoryContext old_cxt;
240 HASHCTL hash_ctl;
241
242 /*
243 * To ease cleanup, make a separate context that will contain the
244 * RewriteState struct itself plus all subsidiary data.
245 */
247 "Table rewrite",
249 old_cxt = MemoryContextSwitchTo(rw_cxt);
250
251 /* Create and fill in the state struct */
252 state = palloc0(sizeof(RewriteStateData));
253
254 state->rs_old_rel = old_heap;
255 state->rs_new_rel = new_heap;
256 state->rs_buffer = NULL;
257 /* new_heap needn't be empty, just locked */
258 state->rs_blockno = RelationGetNumberOfBlocks(new_heap);
259 state->rs_oldest_xmin = oldest_xmin;
260 state->rs_freeze_xid = freeze_xid;
261 state->rs_cutoff_multi = cutoff_multi;
262 state->rs_cxt = rw_cxt;
263 state->rs_bulkstate = smgr_bulk_start_rel(new_heap, MAIN_FORKNUM);
264
265 /* Initialize hash tables used to track update chains */
266 hash_ctl.keysize = sizeof(TidHashKey);
267 hash_ctl.entrysize = sizeof(UnresolvedTupData);
268 hash_ctl.hcxt = state->rs_cxt;
269
270 state->rs_unresolved_tups =
271 hash_create("Rewrite / Unresolved ctids",
272 128, /* arbitrary initial size */
273 &hash_ctl,
275
276 hash_ctl.entrysize = sizeof(OldToNewMappingData);
277
278 state->rs_old_new_tid_map =
279 hash_create("Rewrite / Old to new tid map",
280 128, /* arbitrary initial size */
281 &hash_ctl,
283
284 MemoryContextSwitchTo(old_cxt);
285
287
288 return state;
289}
#define RelationGetNumberOfBlocks(reln)
Definition: bufmgr.h:274
BulkWriteState * smgr_bulk_start_rel(Relation rel, ForkNumber forknum)
Definition: bulk_write.c:87
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
void * palloc0(Size size)
Definition: mcxt.c:1347
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
@ MAIN_FORKNUM
Definition: relpath.h:58
static void logical_begin_heap_rewrite(RewriteState state)
Definition: rewriteheap.c:759
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
MemoryContext hcxt
Definition: hsearch.h:86
Definition: regguts.h:323

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CurrentMemoryContext, HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, logical_begin_heap_rewrite(), MAIN_FORKNUM, MemoryContextSwitchTo(), palloc0(), RelationGetNumberOfBlocks, and smgr_bulk_start_rel().

Referenced by heapam_relation_copy_for_cluster().

◆ CheckPointLogicalRewriteHeap()

void CheckPointLogicalRewriteHeap ( void  )

Definition at line 1155 of file rewriteheap.c.

1156{
1157 XLogRecPtr cutoff;
1158 XLogRecPtr redo;
1159 DIR *mappings_dir;
1160 struct dirent *mapping_de;
1161 char path[MAXPGPATH + sizeof(PG_LOGICAL_MAPPINGS_DIR)];
1162
1163 /*
1164 * We start of with a minimum of the last redo pointer. No new decoding
1165 * slot will start before that, so that's a safe upper bound for removal.
1166 */
1167 redo = GetRedoRecPtr();
1168
1169 /* now check for the restart ptrs from existing slots */
1171
1172 /* don't start earlier than the restart lsn */
1173 if (cutoff != InvalidXLogRecPtr && redo < cutoff)
1174 cutoff = redo;
1175
1176 mappings_dir = AllocateDir(PG_LOGICAL_MAPPINGS_DIR);
1177 while ((mapping_de = ReadDir(mappings_dir, PG_LOGICAL_MAPPINGS_DIR)) != NULL)
1178 {
1179 Oid dboid;
1180 Oid relid;
1181 XLogRecPtr lsn;
1182 TransactionId rewrite_xid;
1183 TransactionId create_xid;
1184 uint32 hi,
1185 lo;
1186 PGFileType de_type;
1187
1188 if (strcmp(mapping_de->d_name, ".") == 0 ||
1189 strcmp(mapping_de->d_name, "..") == 0)
1190 continue;
1191
1192 snprintf(path, sizeof(path), "%s/%s", PG_LOGICAL_MAPPINGS_DIR, mapping_de->d_name);
1193 de_type = get_dirent_type(path, mapping_de, false, DEBUG1);
1194
1195 if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
1196 continue;
1197
1198 /* Skip over files that cannot be ours. */
1199 if (strncmp(mapping_de->d_name, "map-", 4) != 0)
1200 continue;
1201
1202 if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
1203 &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
1204 elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
1205
1206 lsn = ((uint64) hi) << 32 | lo;
1207
1208 if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
1209 {
1210 elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
1211 if (unlink(path) < 0)
1212 ereport(ERROR,
1214 errmsg("could not remove file \"%s\": %m", path)));
1215 }
1216 else
1217 {
1218 /* on some operating systems fsyncing a file requires O_RDWR */
1219 int fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
1220
1221 /*
1222 * The file cannot vanish due to concurrency since this function
1223 * is the only one removing logical mappings and only one
1224 * checkpoint can be in progress at a time.
1225 */
1226 if (fd < 0)
1227 ereport(ERROR,
1229 errmsg("could not open file \"%s\": %m", path)));
1230
1231 /*
1232 * We could try to avoid fsyncing files that either haven't
1233 * changed or have only been created since the checkpoint's start,
1234 * but it's currently not deemed worth the effort.
1235 */
1236 pgstat_report_wait_start(WAIT_EVENT_LOGICAL_REWRITE_CHECKPOINT_SYNC);
1237 if (pg_fsync(fd) != 0)
1240 errmsg("could not fsync file \"%s\": %m", path)));
1242
1243 if (CloseTransientFile(fd) != 0)
1244 ereport(ERROR,
1246 errmsg("could not close file \"%s\": %m", path)));
1247 }
1248 }
1249 FreeDir(mappings_dir);
1250
1251 /* persist directory entries to disk */
1253}
#define PG_BINARY
Definition: c.h:1244
uint64_t uint64
Definition: c.h:503
uint32_t uint32
Definition: c.h:502
uint32 TransactionId
Definition: c.h:623
int errcode_for_file_access(void)
Definition: elog.c:876
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
int FreeDir(DIR *dir)
Definition: fd.c:2985
int CloseTransientFile(int fd)
Definition: fd.c:2833
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:755
int data_sync_elevel(int elevel)
Definition: fd.c:3961
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2867
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2933
int pg_fsync(int fd)
Definition: fd.c:385
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2657
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
Definition: file_utils.c:547
PGFileType
Definition: file_utils.h:19
@ PGFILETYPE_REG
Definition: file_utils.h:22
@ PGFILETYPE_ERROR
Definition: file_utils.h:20
#define MAXPGPATH
#define snprintf
Definition: port.h:239
unsigned int Oid
Definition: postgres_ext.h:30
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_LOGICAL_MAPPINGS_DIR
Definition: reorderbuffer.h:23
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition: slot.c:1205
Definition: dirent.c:26
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:85
static void pgstat_report_wait_end(void)
Definition: wait_event.h:101
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:6483
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References AllocateDir(), CloseTransientFile(), dirent::d_name, data_sync_elevel(), DEBUG1, elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FreeDir(), fsync_fname(), get_dirent_type(), GetRedoRecPtr(), InvalidXLogRecPtr, LOGICAL_REWRITE_FORMAT, MAXPGPATH, OpenTransientFile(), PG_BINARY, pg_fsync(), PG_LOGICAL_MAPPINGS_DIR, PGFILETYPE_ERROR, PGFILETYPE_REG, pgstat_report_wait_end(), pgstat_report_wait_start(), ReadDir(), ReplicationSlotsComputeLogicalRestartLSN(), and snprintf.

Referenced by CheckPointGuts().

◆ end_heap_rewrite()

void end_heap_rewrite ( RewriteState  state)

Definition at line 297 of file rewriteheap.c.

298{
299 HASH_SEQ_STATUS seq_status;
300 UnresolvedTup unresolved;
301
302 /*
303 * Write any remaining tuples in the UnresolvedTups table. If we have any
304 * left, they should in fact be dead, but let's err on the safe side.
305 */
306 hash_seq_init(&seq_status, state->rs_unresolved_tups);
307
308 while ((unresolved = hash_seq_search(&seq_status)) != NULL)
309 {
311 raw_heap_insert(state, unresolved->tuple);
312 }
313
314 /* Write the last page, if any */
315 if (state->rs_buffer)
316 {
317 smgr_bulk_write(state->rs_bulkstate, state->rs_blockno, state->rs_buffer, true);
318 state->rs_buffer = NULL;
319 }
320
321 smgr_bulk_finish(state->rs_bulkstate);
322
324
325 /* Deleting the context frees everything */
326 MemoryContextDelete(state->rs_cxt);
327}
void smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)
Definition: bulk_write.c:323
void smgr_bulk_finish(BulkWriteState *bulkstate)
Definition: bulk_write.c:130
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1420
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1385
static void ItemPointerSetInvalid(ItemPointerData *pointer)
Definition: itemptr.h:184
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
static void raw_heap_insert(RewriteState state, HeapTuple tup)
Definition: rewriteheap.c:593
static void logical_end_heap_rewrite(RewriteState state)
Definition: rewriteheap.c:905
HeapTupleHeader t_data
Definition: htup.h:68
ItemPointerData t_ctid
Definition: htup_details.h:161

References hash_seq_init(), hash_seq_search(), ItemPointerSetInvalid(), logical_end_heap_rewrite(), MemoryContextDelete(), raw_heap_insert(), smgr_bulk_finish(), smgr_bulk_write(), HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, and UnresolvedTupData::tuple.

Referenced by heapam_relation_copy_for_cluster().

◆ heap_xlog_logical_rewrite()

void heap_xlog_logical_rewrite ( XLogReaderState r)

Definition at line 1073 of file rewriteheap.c.

1074{
1075 char path[MAXPGPATH];
1076 int fd;
1078 uint32 len;
1079 char *data;
1080
1082
1083 snprintf(path, MAXPGPATH,
1086 LSN_FORMAT_ARGS(xlrec->start_lsn),
1087 xlrec->mapped_xid, XLogRecGetXid(r));
1088
1089 fd = OpenTransientFile(path,
1090 O_CREAT | O_WRONLY | PG_BINARY);
1091 if (fd < 0)
1092 ereport(ERROR,
1094 errmsg("could not create file \"%s\": %m", path)));
1095
1096 /*
1097 * Truncate all data that's not guaranteed to have been safely fsynced (by
1098 * previous record or by the last checkpoint).
1099 */
1100 pgstat_report_wait_start(WAIT_EVENT_LOGICAL_REWRITE_TRUNCATE);
1101 if (ftruncate(fd, xlrec->offset) != 0)
1102 ereport(ERROR,
1104 errmsg("could not truncate file \"%s\" to %u: %m",
1105 path, (uint32) xlrec->offset)));
1107
1108 data = XLogRecGetData(r) + sizeof(*xlrec);
1109
1110 len = xlrec->num_mappings * sizeof(LogicalRewriteMappingData);
1111
1112 /* write out tail end of mapping file (again) */
1113 errno = 0;
1114 pgstat_report_wait_start(WAIT_EVENT_LOGICAL_REWRITE_MAPPING_WRITE);
1115 if (pg_pwrite(fd, data, len, xlrec->offset) != len)
1116 {
1117 /* if write didn't set errno, assume problem is no disk space */
1118 if (errno == 0)
1119 errno = ENOSPC;
1120 ereport(ERROR,
1122 errmsg("could not write to file \"%s\": %m", path)));
1123 }
1125
1126 /*
1127 * Now fsync all previously written data. We could improve things and only
1128 * do this for the last write to a file, but the required bookkeeping
1129 * doesn't seem worth the trouble.
1130 */
1131 pgstat_report_wait_start(WAIT_EVENT_LOGICAL_REWRITE_MAPPING_SYNC);
1132 if (pg_fsync(fd) != 0)
1135 errmsg("could not fsync file \"%s\": %m", path)));
1137
1138 if (CloseTransientFile(fd) != 0)
1139 ereport(ERROR,
1141 errmsg("could not close file \"%s\": %m", path)));
1142}
const void size_t len
const void * data
#define pg_pwrite
Definition: port.h:227
struct LogicalRewriteMappingData LogicalRewriteMappingData
TransactionId mapped_xid
Definition: heapam_xlog.h:475
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecGetData(decoder)
Definition: xlogreader.h:415
#define XLogRecGetXid(decoder)
Definition: xlogreader.h:412

References CloseTransientFile(), data, data_sync_elevel(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), len, LOGICAL_REWRITE_FORMAT, LSN_FORMAT_ARGS, xl_heap_rewrite_mapping::mapped_db, xl_heap_rewrite_mapping::mapped_rel, xl_heap_rewrite_mapping::mapped_xid, MAXPGPATH, xl_heap_rewrite_mapping::num_mappings, xl_heap_rewrite_mapping::offset, OpenTransientFile(), PG_BINARY, pg_fsync(), PG_LOGICAL_MAPPINGS_DIR, pg_pwrite, pgstat_report_wait_end(), pgstat_report_wait_start(), snprintf, xl_heap_rewrite_mapping::start_lsn, XLogRecGetData, and XLogRecGetXid.

Referenced by heap2_redo().

◆ logical_begin_heap_rewrite()

static void logical_begin_heap_rewrite ( RewriteState  state)
static

Definition at line 759 of file rewriteheap.c.

760{
761 HASHCTL hash_ctl;
762 TransactionId logical_xmin;
763
764 /*
765 * We only need to persist these mappings if the rewritten table can be
766 * accessed during logical decoding, if not, we can skip doing any
767 * additional work.
768 */
769 state->rs_logical_rewrite =
771
772 if (!state->rs_logical_rewrite)
773 return;
774
775 ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin);
776
777 /*
778 * If there are no logical slots in progress we don't need to do anything,
779 * there cannot be any remappings for relevant rows yet. The relation's
780 * lock protects us against races.
781 */
782 if (logical_xmin == InvalidTransactionId)
783 {
784 state->rs_logical_rewrite = false;
785 return;
786 }
787
788 state->rs_logical_xmin = logical_xmin;
789 state->rs_begin_lsn = GetXLogInsertRecPtr();
790 state->rs_num_rewrite_mappings = 0;
791
792 hash_ctl.keysize = sizeof(TransactionId);
793 hash_ctl.entrysize = sizeof(RewriteMappingFile);
794 hash_ctl.hcxt = state->rs_cxt;
795
796 state->rs_logical_mappings =
797 hash_create("Logical rewrite mapping",
798 128, /* arbitrary initial size */
799 &hash_ctl,
801}
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:3968
#define RelationIsAccessibleInLogicalDecoding(relation)
Definition: rel.h:692
struct RewriteMappingFile RewriteMappingFile
#define InvalidTransactionId
Definition: transam.h:31
XLogRecPtr GetXLogInsertRecPtr(void)
Definition: xlog.c:9470

References HASHCTL::entrysize, GetXLogInsertRecPtr(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, HASHCTL::keysize, ProcArrayGetReplicationSlotXmin(), and RelationIsAccessibleInLogicalDecoding.

Referenced by begin_heap_rewrite().

◆ logical_end_heap_rewrite()

static void logical_end_heap_rewrite ( RewriteState  state)
static

Definition at line 905 of file rewriteheap.c.

906{
907 HASH_SEQ_STATUS seq_status;
909
910 /* done, no logical rewrite in progress */
911 if (!state->rs_logical_rewrite)
912 return;
913
914 /* writeout remaining in-memory entries */
915 if (state->rs_num_rewrite_mappings > 0)
917
918 /* Iterate over all mappings we have written and fsync the files. */
919 hash_seq_init(&seq_status, state->rs_logical_mappings);
920 while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
921 {
922 if (FileSync(src->vfd, WAIT_EVENT_LOGICAL_REWRITE_SYNC) != 0)
925 errmsg("could not fsync file \"%s\": %m", src->path)));
926 FileClose(src->vfd);
927 }
928 /* memory context cleanup will deal with the rest */
929}
int FileSync(File file, uint32 wait_event_info)
Definition: fd.c:2321
void FileClose(File file)
Definition: fd.c:1979
static void logical_heap_rewrite_flush_mappings(RewriteState state)
Definition: rewriteheap.c:807
char path[MAXPGPATH]
Definition: rewriteheap.c:197

References data_sync_elevel(), ereport, errcode_for_file_access(), errmsg(), ERROR, FileClose(), FileSync(), hash_seq_init(), hash_seq_search(), logical_heap_rewrite_flush_mappings(), RewriteMappingFile::path, and RewriteMappingFile::vfd.

Referenced by end_heap_rewrite().

◆ logical_heap_rewrite_flush_mappings()

static void logical_heap_rewrite_flush_mappings ( RewriteState  state)
static

Definition at line 807 of file rewriteheap.c.

808{
809 HASH_SEQ_STATUS seq_status;
812
813 Assert(state->rs_logical_rewrite);
814
815 /* no logical rewrite in progress, no need to iterate over mappings */
816 if (state->rs_num_rewrite_mappings == 0)
817 return;
818
819 elog(DEBUG1, "flushing %u logical rewrite mapping entries",
820 state->rs_num_rewrite_mappings);
821
822 hash_seq_init(&seq_status, state->rs_logical_mappings);
823 while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
824 {
825 char *waldata;
826 char *waldata_start;
828 Oid dboid;
829 uint32 len;
830 int written;
831 uint32 num_mappings = dclist_count(&src->mappings);
832
833 /* this file hasn't got any new mappings */
834 if (num_mappings == 0)
835 continue;
836
837 if (state->rs_old_rel->rd_rel->relisshared)
838 dboid = InvalidOid;
839 else
840 dboid = MyDatabaseId;
841
842 xlrec.num_mappings = num_mappings;
843 xlrec.mapped_rel = RelationGetRelid(state->rs_old_rel);
844 xlrec.mapped_xid = src->xid;
845 xlrec.mapped_db = dboid;
846 xlrec.offset = src->off;
847 xlrec.start_lsn = state->rs_begin_lsn;
848
849 /* write all mappings consecutively */
850 len = num_mappings * sizeof(LogicalRewriteMappingData);
851 waldata_start = waldata = palloc(len);
852
853 /*
854 * collect data we need to write out, but don't modify ondisk data yet
855 */
857 {
859
860 pmap = dclist_container(RewriteMappingDataEntry, node, iter.cur);
861
862 memcpy(waldata, &pmap->map, sizeof(pmap->map));
863 waldata += sizeof(pmap->map);
864
865 /* remove from the list and free */
866 dclist_delete_from(&src->mappings, &pmap->node);
867 pfree(pmap);
868
869 /* update bookkeeping */
870 state->rs_num_rewrite_mappings--;
871 }
872
873 Assert(dclist_count(&src->mappings) == 0);
874 Assert(waldata == waldata_start + len);
875
876 /*
877 * Note that we deviate from the usual WAL coding practices here,
878 * check the above "Logical rewrite support" comment for reasoning.
879 */
880 written = FileWrite(src->vfd, waldata_start, len, src->off,
881 WAIT_EVENT_LOGICAL_REWRITE_WRITE);
882 if (written != len)
885 errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path,
886 written, len)));
887 src->off += len;
888
890 XLogRegisterData(&xlrec, sizeof(xlrec));
891 XLogRegisterData(waldata_start, len);
892
893 /* write xlog record */
894 XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_REWRITE);
895
896 pfree(waldata_start);
897 }
898 Assert(state->rs_num_rewrite_mappings == 0);
899}
static ssize_t FileWrite(File file, const void *buffer, size_t amount, off_t offset, uint32 wait_event_info)
Definition: fd.h:208
Oid MyDatabaseId
Definition: globals.c:93
Assert(PointerIsAligned(start, uint64))
#define XLOG_HEAP2_REWRITE
Definition: heapam_xlog.h:59
#define dclist_container(type, membername, ptr)
Definition: ilist.h:947
static uint32 dclist_count(const dclist_head *head)
Definition: ilist.h:932
static void dclist_delete_from(dclist_head *head, dlist_node *node)
Definition: ilist.h:763
#define dclist_foreach_modify(iter, lhead)
Definition: ilist.h:973
void pfree(void *pointer)
Definition: mcxt.c:1524
void * palloc(Size size)
Definition: mcxt.c:1317
#define InvalidOid
Definition: postgres_ext.h:35
#define RelationGetRelid(relation)
Definition: rel.h:513
LogicalRewriteMappingData map
Definition: rewriteheap.c:206
TransactionId xid
Definition: rewriteheap.c:193
dclist_head mappings
Definition: rewriteheap.c:196
dlist_node * cur
Definition: ilist.h:200
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:474
void XLogRegisterData(const void *data, uint32 len)
Definition: xloginsert.c:364
void XLogBeginInsert(void)
Definition: xloginsert.c:149

References Assert(), dlist_mutable_iter::cur, dclist_container, dclist_count(), dclist_delete_from(), dclist_foreach_modify, DEBUG1, elog, ereport, errcode_for_file_access(), errmsg(), ERROR, FileWrite(), hash_seq_init(), hash_seq_search(), InvalidOid, len, RewriteMappingDataEntry::map, xl_heap_rewrite_mapping::mapped_db, xl_heap_rewrite_mapping::mapped_rel, xl_heap_rewrite_mapping::mapped_xid, RewriteMappingFile::mappings, MyDatabaseId, RewriteMappingDataEntry::node, xl_heap_rewrite_mapping::num_mappings, RewriteMappingFile::off, xl_heap_rewrite_mapping::offset, palloc(), RewriteMappingFile::path, pfree(), RelationGetRelid, xl_heap_rewrite_mapping::start_lsn, RewriteMappingFile::vfd, RewriteMappingFile::xid, XLOG_HEAP2_REWRITE, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by logical_end_heap_rewrite(), and logical_rewrite_log_mapping().

◆ logical_rewrite_heap_tuple()

static void logical_rewrite_heap_tuple ( RewriteState  state,
ItemPointerData  old_tid,
HeapTuple  new_tuple 
)
static

Definition at line 999 of file rewriteheap.c.

1001{
1002 ItemPointerData new_tid = new_tuple->t_self;
1003 TransactionId cutoff = state->rs_logical_xmin;
1004 TransactionId xmin;
1005 TransactionId xmax;
1006 bool do_log_xmin = false;
1007 bool do_log_xmax = false;
1009
1010 /* no logical rewrite in progress, we don't need to log anything */
1011 if (!state->rs_logical_rewrite)
1012 return;
1013
1014 xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
1015 /* use *GetUpdateXid to correctly deal with multixacts */
1016 xmax = HeapTupleHeaderGetUpdateXid(new_tuple->t_data);
1017
1018 /*
1019 * Log the mapping iff the tuple has been created recently.
1020 */
1021 if (TransactionIdIsNormal(xmin) && !TransactionIdPrecedes(xmin, cutoff))
1022 do_log_xmin = true;
1023
1024 if (!TransactionIdIsNormal(xmax))
1025 {
1026 /*
1027 * no xmax is set, can't have any permanent ones, so this check is
1028 * sufficient
1029 */
1030 }
1031 else if (HEAP_XMAX_IS_LOCKED_ONLY(new_tuple->t_data->t_infomask))
1032 {
1033 /* only locked, we don't care */
1034 }
1035 else if (!TransactionIdPrecedes(xmax, cutoff))
1036 {
1037 /* tuple has been deleted recently, log */
1038 do_log_xmax = true;
1039 }
1040
1041 /* if neither needs to be logged, we're done */
1042 if (!do_log_xmin && !do_log_xmax)
1043 return;
1044
1045 /* fill out mapping information */
1046 map.old_locator = state->rs_old_rel->rd_locator;
1047 map.old_tid = old_tid;
1048 map.new_locator = state->rs_new_rel->rd_locator;
1049 map.new_tid = new_tid;
1050
1051 /* ---
1052 * Now persist the mapping for the individual xids that are affected. We
1053 * need to log for both xmin and xmax if they aren't the same transaction
1054 * since the mapping files are per "affected" xid.
1055 * We don't muster all that much effort detecting whether xmin and xmax
1056 * are actually the same transaction, we just check whether the xid is the
1057 * same disregarding subtransactions. Logging too much is relatively
1058 * harmless and we could never do the check fully since subtransaction
1059 * data is thrown away during restarts.
1060 * ---
1061 */
1062 if (do_log_xmin)
1064 /* separately log mapping for xmax unless it'd be redundant */
1065 if (do_log_xmax && !TransactionIdEquals(xmin, xmax))
1067}
static bool HEAP_XMAX_IS_LOCKED_ONLY(uint16 infomask)
Definition: htup_details.h:226
static TransactionId HeapTupleHeaderGetXmin(const HeapTupleHeaderData *tup)
Definition: htup_details.h:324
static TransactionId HeapTupleHeaderGetUpdateXid(const HeapTupleHeaderData *tup)
Definition: htup_details.h:397
static void logical_rewrite_log_mapping(RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
Definition: rewriteheap.c:935
ItemPointerData t_self
Definition: htup.h:65
ItemPointerData new_tid
Definition: rewriteheap.h:40
RelFileLocator old_locator
Definition: rewriteheap.h:37
ItemPointerData old_tid
Definition: rewriteheap.h:39
RelFileLocator new_locator
Definition: rewriteheap.h:38
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
#define TransactionIdIsNormal(xid)
Definition: transam.h:42

References HEAP_XMAX_IS_LOCKED_ONLY(), HeapTupleHeaderGetUpdateXid(), HeapTupleHeaderGetXmin(), logical_rewrite_log_mapping(), LogicalRewriteMappingData::new_locator, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_locator, LogicalRewriteMappingData::old_tid, HeapTupleData::t_data, HeapTupleHeaderData::t_infomask, HeapTupleData::t_self, TransactionIdEquals, TransactionIdIsNormal, and TransactionIdPrecedes().

Referenced by rewrite_heap_tuple().

◆ logical_rewrite_log_mapping()

static void logical_rewrite_log_mapping ( RewriteState  state,
TransactionId  xid,
LogicalRewriteMappingData map 
)
static

Definition at line 935 of file rewriteheap.c.

937{
940 Oid relid;
941 bool found;
942
943 relid = RelationGetRelid(state->rs_old_rel);
944
945 /* look for existing mappings for this 'mapped' xid */
946 src = hash_search(state->rs_logical_mappings, &xid,
947 HASH_ENTER, &found);
948
949 /*
950 * We haven't yet had the need to map anything for this xid, create
951 * per-xid data structures.
952 */
953 if (!found)
954 {
955 char path[MAXPGPATH];
956 Oid dboid;
957
958 if (state->rs_old_rel->rd_rel->relisshared)
959 dboid = InvalidOid;
960 else
961 dboid = MyDatabaseId;
962
963 snprintf(path, MAXPGPATH,
965 PG_LOGICAL_MAPPINGS_DIR, dboid, relid,
966 LSN_FORMAT_ARGS(state->rs_begin_lsn),
968
969 dclist_init(&src->mappings);
970 src->off = 0;
971 memcpy(src->path, path, sizeof(path));
972 src->vfd = PathNameOpenFile(path,
973 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
974 if (src->vfd < 0)
977 errmsg("could not create file \"%s\": %m", path)));
978 }
979
980 pmap = MemoryContextAlloc(state->rs_cxt,
982 memcpy(&pmap->map, map, sizeof(LogicalRewriteMappingData));
983 dclist_push_tail(&src->mappings, &pmap->node);
984 state->rs_num_rewrite_mappings++;
985
986 /*
987 * Write out buffer every time we've too many in-memory entries across all
988 * mapping files.
989 */
990 if (state->rs_num_rewrite_mappings >= 1000 /* arbitrary number */ )
992}
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1576
@ HASH_ENTER
Definition: hsearch.h:114
static void dclist_push_tail(dclist_head *head, dlist_node *node)
Definition: ilist.h:709
static void dclist_init(dclist_head *head)
Definition: ilist.h:671
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1181
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:454

References dclist_init(), dclist_push_tail(), ereport, errcode_for_file_access(), errmsg(), ERROR, GetCurrentTransactionId(), HASH_ENTER, hash_search(), InvalidOid, logical_heap_rewrite_flush_mappings(), LOGICAL_REWRITE_FORMAT, LSN_FORMAT_ARGS, RewriteMappingDataEntry::map, RewriteMappingFile::mappings, MAXPGPATH, MemoryContextAlloc(), MyDatabaseId, RewriteMappingDataEntry::node, RewriteMappingFile::off, RewriteMappingFile::path, PathNameOpenFile(), PG_BINARY, PG_LOGICAL_MAPPINGS_DIR, RelationGetRelid, snprintf, and RewriteMappingFile::vfd.

Referenced by logical_rewrite_heap_tuple().

◆ raw_heap_insert()

static void raw_heap_insert ( RewriteState  state,
HeapTuple  tup 
)
static

Definition at line 593 of file rewriteheap.c.

594{
595 Page page;
596 Size pageFreeSpace,
597 saveFreeSpace;
598 Size len;
599 OffsetNumber newoff;
600 HeapTuple heaptup;
601
602 /*
603 * If the new tuple is too big for storage or contains already toasted
604 * out-of-line attributes from some other relation, invoke the toaster.
605 *
606 * Note: below this point, heaptup is the data we actually intend to store
607 * into the relation; tup is the caller's original untoasted data.
608 */
609 if (state->rs_new_rel->rd_rel->relkind == RELKIND_TOASTVALUE)
610 {
611 /* toast table entries should never be recursively toasted */
613 heaptup = tup;
614 }
615 else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
616 {
618
619 /*
620 * While rewriting the heap for VACUUM FULL / CLUSTER, make sure data
621 * for the TOAST table are not logically decoded. The main heap is
622 * WAL-logged as XLOG FPI records, which are not logically decoded.
623 */
625
626 heaptup = heap_toast_insert_or_update(state->rs_new_rel, tup, NULL,
627 options);
628 }
629 else
630 heaptup = tup;
631
632 len = MAXALIGN(heaptup->t_len); /* be conservative */
633
634 /*
635 * If we're gonna fail for oversize tuple, do it right away
636 */
637 if (len > MaxHeapTupleSize)
639 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
640 errmsg("row is too big: size %zu, maximum size %zu",
642
643 /* Compute desired extra freespace due to fillfactor option */
644 saveFreeSpace = RelationGetTargetPageFreeSpace(state->rs_new_rel,
646
647 /* Now we can check to see if there's enough free space already. */
648 page = (Page) state->rs_buffer;
649 if (page)
650 {
651 pageFreeSpace = PageGetHeapFreeSpace(page);
652
653 if (len + saveFreeSpace > pageFreeSpace)
654 {
655 /*
656 * Doesn't fit, so write out the existing page. It always
657 * contains a tuple. Hence, unlike RelationGetBufferForTuple(),
658 * enforce saveFreeSpace unconditionally.
659 */
660 smgr_bulk_write(state->rs_bulkstate, state->rs_blockno, state->rs_buffer, true);
661 state->rs_buffer = NULL;
662 page = NULL;
663 state->rs_blockno++;
664 }
665 }
666
667 if (!page)
668 {
669 /* Initialize a new empty page */
670 state->rs_buffer = smgr_bulk_get_buf(state->rs_bulkstate);
671 page = (Page) state->rs_buffer;
672 PageInit(page, BLCKSZ, 0);
673 }
674
675 /* And now we can insert the tuple into the page */
676 newoff = PageAddItem(page, (Item) heaptup->t_data, heaptup->t_len,
677 InvalidOffsetNumber, false, true);
678 if (newoff == InvalidOffsetNumber)
679 elog(ERROR, "failed to add tuple");
680
681 /* Update caller's t_self to the actual position where it was stored */
682 ItemPointerSet(&(tup->t_self), state->rs_blockno, newoff);
683
684 /*
685 * Insert the correct position into CTID of the stored tuple, too, if the
686 * caller didn't supply a valid CTID.
687 */
688 if (!ItemPointerIsValid(&tup->t_data->t_ctid))
689 {
690 ItemId newitemid;
691 HeapTupleHeader onpage_tup;
692
693 newitemid = PageGetItemId(page, newoff);
694 onpage_tup = (HeapTupleHeader) PageGetItem(page, newitemid);
695
696 onpage_tup->t_ctid = tup->t_self;
697 }
698
699 /* If heaptup is a private copy, release it. */
700 if (heaptup != tup)
701 heap_freetuple(heaptup);
702}
Size PageGetHeapFreeSpace(const PageData *page)
Definition: bufpage.c:980
void PageInit(Page page, Size pageSize, Size specialSize)
Definition: bufpage.c:42
static Item PageGetItem(const PageData *page, const ItemIdData *itemId)
Definition: bufpage.h:354
static ItemId PageGetItemId(Page page, OffsetNumber offsetNumber)
Definition: bufpage.h:244
PageData * Page
Definition: bufpage.h:82
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:471
BulkWriteBuffer smgr_bulk_get_buf(BulkWriteState *bulkstate)
Definition: bulk_write.c:347
#define MAXALIGN(LEN)
Definition: c.h:782
size_t Size
Definition: c.h:576
int errcode(int sqlerrcode)
Definition: elog.c:853
#define HEAP_INSERT_SKIP_FSM
Definition: heapam.h:36
#define HEAP_INSERT_NO_LOGICAL
Definition: heapam.h:38
HeapTuple heap_toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, int options)
Definition: heaptoast.c:96
#define TOAST_TUPLE_THRESHOLD
Definition: heaptoast.h:48
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1435
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
static bool HeapTupleHasExternal(const HeapTupleData *tuple)
Definition: htup_details.h:762
#define MaxHeapTupleSize
Definition: htup_details.h:610
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:78
Pointer Item
Definition: item.h:17
static void ItemPointerSet(ItemPointerData *pointer, BlockNumber blockNumber, OffsetNumber offNum)
Definition: itemptr.h:135
static bool ItemPointerIsValid(const ItemPointerData *pointer)
Definition: itemptr.h:83
#define InvalidOffsetNumber
Definition: off.h:26
uint16 OffsetNumber
Definition: off.h:24
#define RelationGetTargetPageFreeSpace(relation, defaultff)
Definition: rel.h:386
#define HEAP_DEFAULT_FILLFACTOR
Definition: rel.h:357
uint32 t_len
Definition: htup.h:64

References Assert(), elog, ereport, errcode(), errmsg(), ERROR, HEAP_DEFAULT_FILLFACTOR, heap_freetuple(), HEAP_INSERT_NO_LOGICAL, HEAP_INSERT_SKIP_FSM, heap_toast_insert_or_update(), HeapTupleHasExternal(), if(), InvalidOffsetNumber, ItemPointerIsValid(), ItemPointerSet(), len, MAXALIGN, MaxHeapTupleSize, PageAddItem, PageGetHeapFreeSpace(), PageGetItem(), PageGetItemId(), PageInit(), RelationGetTargetPageFreeSpace, smgr_bulk_get_buf(), smgr_bulk_write(), HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, HeapTupleData::t_len, HeapTupleData::t_self, and TOAST_TUPLE_THRESHOLD.

Referenced by end_heap_rewrite(), and rewrite_heap_tuple().

◆ rewrite_heap_dead_tuple()

bool rewrite_heap_dead_tuple ( RewriteState  state,
HeapTuple  old_tuple 
)

Definition at line 543 of file rewriteheap.c.

544{
545 /*
546 * If we have already seen an earlier tuple in the update chain that
547 * points to this tuple, let's forget about that earlier tuple. It's in
548 * fact dead as well, our simple xmax < OldestXmin test in
549 * HeapTupleSatisfiesVacuum just wasn't enough to detect it. It happens
550 * when xmin of a tuple is greater than xmax, which sounds
551 * counter-intuitive but is perfectly valid.
552 *
553 * We don't bother to try to detect the situation the other way round,
554 * when we encounter the dead tuple first and then the recently dead one
555 * that points to it. If that happens, we'll have some unmatched entries
556 * in the UnresolvedTups hash table at the end. That can happen anyway,
557 * because a vacuum might have removed the dead tuple in the chain before
558 * us.
559 */
560 UnresolvedTup unresolved;
561 TidHashKey hashkey;
562 bool found;
563
564 memset(&hashkey, 0, sizeof(hashkey));
565 hashkey.xmin = HeapTupleHeaderGetXmin(old_tuple->t_data);
566 hashkey.tid = old_tuple->t_self;
567
568 unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
569 HASH_FIND, NULL);
570
571 if (unresolved != NULL)
572 {
573 /* Need to free the contained tuple as well as the hashtable entry */
574 heap_freetuple(unresolved->tuple);
575 hash_search(state->rs_unresolved_tups, &hashkey,
576 HASH_REMOVE, &found);
577 Assert(found);
578 return true;
579 }
580
581 return false;
582}
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_REMOVE
Definition: hsearch.h:115
TransactionId xmin
Definition: rewriteheap.c:163
ItemPointerData tid
Definition: rewriteheap.c:164

References Assert(), HASH_FIND, HASH_REMOVE, hash_search(), heap_freetuple(), HeapTupleHeaderGetXmin(), HeapTupleData::t_data, HeapTupleData::t_self, TidHashKey::tid, UnresolvedTupData::tuple, and TidHashKey::xmin.

Referenced by heapam_relation_copy_for_cluster().

◆ rewrite_heap_tuple()

void rewrite_heap_tuple ( RewriteState  state,
HeapTuple  old_tuple,
HeapTuple  new_tuple 
)

Definition at line 341 of file rewriteheap.c.

343{
344 MemoryContext old_cxt;
345 ItemPointerData old_tid;
346 TidHashKey hashkey;
347 bool found;
348 bool free_new;
349
350 old_cxt = MemoryContextSwitchTo(state->rs_cxt);
351
352 /*
353 * Copy the original tuple's visibility information into new_tuple.
354 *
355 * XXX we might later need to copy some t_infomask2 bits, too? Right now,
356 * we intentionally clear the HOT status bits.
357 */
358 memcpy(&new_tuple->t_data->t_choice.t_heap,
359 &old_tuple->t_data->t_choice.t_heap,
360 sizeof(HeapTupleFields));
361
362 new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
363 new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
364 new_tuple->t_data->t_infomask |=
365 old_tuple->t_data->t_infomask & HEAP_XACT_MASK;
366
367 /*
368 * While we have our hands on the tuple, we may as well freeze any
369 * eligible xmin or xmax, so that future VACUUM effort can be saved.
370 */
371 heap_freeze_tuple(new_tuple->t_data,
372 state->rs_old_rel->rd_rel->relfrozenxid,
373 state->rs_old_rel->rd_rel->relminmxid,
374 state->rs_freeze_xid,
375 state->rs_cutoff_multi);
376
377 /*
378 * Invalid ctid means that ctid should point to the tuple itself. We'll
379 * override it later if the tuple is part of an update chain.
380 */
381 ItemPointerSetInvalid(&new_tuple->t_data->t_ctid);
382
383 /*
384 * If the tuple has been updated, check the old-to-new mapping hash table.
385 */
386 if (!((old_tuple->t_data->t_infomask & HEAP_XMAX_INVALID) ||
387 HeapTupleHeaderIsOnlyLocked(old_tuple->t_data)) &&
389 !(ItemPointerEquals(&(old_tuple->t_self),
390 &(old_tuple->t_data->t_ctid))))
391 {
392 OldToNewMapping mapping;
393
394 memset(&hashkey, 0, sizeof(hashkey));
395 hashkey.xmin = HeapTupleHeaderGetUpdateXid(old_tuple->t_data);
396 hashkey.tid = old_tuple->t_data->t_ctid;
397
398 mapping = (OldToNewMapping)
399 hash_search(state->rs_old_new_tid_map, &hashkey,
400 HASH_FIND, NULL);
401
402 if (mapping != NULL)
403 {
404 /*
405 * We've already copied the tuple that t_ctid points to, so we can
406 * set the ctid of this tuple to point to the new location, and
407 * insert it right away.
408 */
409 new_tuple->t_data->t_ctid = mapping->new_tid;
410
411 /* We don't need the mapping entry anymore */
412 hash_search(state->rs_old_new_tid_map, &hashkey,
413 HASH_REMOVE, &found);
414 Assert(found);
415 }
416 else
417 {
418 /*
419 * We haven't seen the tuple t_ctid points to yet. Stash this
420 * tuple into unresolved_tups to be written later.
421 */
422 UnresolvedTup unresolved;
423
424 unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
425 HASH_ENTER, &found);
426 Assert(!found);
427
428 unresolved->old_tid = old_tuple->t_self;
429 unresolved->tuple = heap_copytuple(new_tuple);
430
431 /*
432 * We can't do anything more now, since we don't know where the
433 * tuple will be written.
434 */
435 MemoryContextSwitchTo(old_cxt);
436 return;
437 }
438 }
439
440 /*
441 * Now we will write the tuple, and then check to see if it is the B tuple
442 * in any new or known pair. When we resolve a known pair, we will be
443 * able to write that pair's A tuple, and then we have to check if it
444 * resolves some other pair. Hence, we need a loop here.
445 */
446 old_tid = old_tuple->t_self;
447 free_new = false;
448
449 for (;;)
450 {
451 ItemPointerData new_tid;
452
453 /* Insert the tuple and find out where it's put in new_heap */
454 raw_heap_insert(state, new_tuple);
455 new_tid = new_tuple->t_self;
456
457 logical_rewrite_heap_tuple(state, old_tid, new_tuple);
458
459 /*
460 * If the tuple is the updated version of a row, and the prior version
461 * wouldn't be DEAD yet, then we need to either resolve the prior
462 * version (if it's waiting in rs_unresolved_tups), or make an entry
463 * in rs_old_new_tid_map (so we can resolve it when we do see it). The
464 * previous tuple's xmax would equal this one's xmin, so it's
465 * RECENTLY_DEAD if and only if the xmin is not before OldestXmin.
466 */
467 if ((new_tuple->t_data->t_infomask & HEAP_UPDATED) &&
469 state->rs_oldest_xmin))
470 {
471 /*
472 * Okay, this is B in an update pair. See if we've seen A.
473 */
474 UnresolvedTup unresolved;
475
476 memset(&hashkey, 0, sizeof(hashkey));
477 hashkey.xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
478 hashkey.tid = old_tid;
479
480 unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
481 HASH_FIND, NULL);
482
483 if (unresolved != NULL)
484 {
485 /*
486 * We have seen and memorized the previous tuple already. Now
487 * that we know where we inserted the tuple its t_ctid points
488 * to, fix its t_ctid and insert it to the new heap.
489 */
490 if (free_new)
491 heap_freetuple(new_tuple);
492 new_tuple = unresolved->tuple;
493 free_new = true;
494 old_tid = unresolved->old_tid;
495 new_tuple->t_data->t_ctid = new_tid;
496
497 /*
498 * We don't need the hash entry anymore, but don't free its
499 * tuple just yet.
500 */
501 hash_search(state->rs_unresolved_tups, &hashkey,
502 HASH_REMOVE, &found);
503 Assert(found);
504
505 /* loop back to insert the previous tuple in the chain */
506 continue;
507 }
508 else
509 {
510 /*
511 * Remember the new tid of this tuple. We'll use it to set the
512 * ctid when we find the previous tuple in the chain.
513 */
514 OldToNewMapping mapping;
515
516 mapping = hash_search(state->rs_old_new_tid_map, &hashkey,
517 HASH_ENTER, &found);
518 Assert(!found);
519
520 mapping->new_tid = new_tid;
521 }
522 }
523
524 /* Done with this (chain of) tuples, for now */
525 if (free_new)
526 heap_freetuple(new_tuple);
527 break;
528 }
529
530 MemoryContextSwitchTo(old_cxt);
531}
bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId relfrozenxid, TransactionId relminmxid, TransactionId FreezeLimit, TransactionId MultiXactCutoff)
Definition: heapam.c:7391
bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple)
HeapTuple heap_copytuple(HeapTuple tuple)
Definition: heaptuple.c:778
#define HEAP_XACT_MASK
Definition: htup_details.h:215
static bool HeapTupleHeaderIndicatesMovedPartitions(const HeapTupleHeaderData *tup)
Definition: htup_details.h:480
#define HEAP_XMAX_INVALID
Definition: htup_details.h:208
#define HEAP_UPDATED
Definition: htup_details.h:210
bool ItemPointerEquals(ItemPointer pointer1, ItemPointer pointer2)
Definition: itemptr.c:35
static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
Definition: rewriteheap.c:999
OldToNewMappingData * OldToNewMapping
Definition: rewriteheap.c:185
union HeapTupleHeaderData::@47 t_choice
HeapTupleFields t_heap
Definition: htup_details.h:157
ItemPointerData new_tid
Definition: rewriteheap.c:182
ItemPointerData old_tid
Definition: rewriteheap.c:173

References Assert(), HASH_ENTER, HASH_FIND, HASH_REMOVE, hash_search(), heap_copytuple(), heap_freetuple(), heap_freeze_tuple(), HEAP_UPDATED, HEAP_XACT_MASK, HEAP_XMAX_INVALID, HeapTupleHeaderGetUpdateXid(), HeapTupleHeaderGetXmin(), HeapTupleHeaderIndicatesMovedPartitions(), HeapTupleHeaderIsOnlyLocked(), ItemPointerEquals(), ItemPointerSetInvalid(), logical_rewrite_heap_tuple(), MemoryContextSwitchTo(), OldToNewMappingData::new_tid, UnresolvedTupData::old_tid, raw_heap_insert(), HeapTupleHeaderData::t_choice, HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, HeapTupleHeaderData::t_heap, HeapTupleHeaderData::t_infomask, HeapTupleHeaderData::t_infomask2, HeapTupleData::t_self, TidHashKey::tid, TransactionIdPrecedes(), UnresolvedTupData::tuple, and TidHashKey::xmin.

Referenced by reform_and_rewrite_tuple().