PostgreSQL Source Code git master
Loading...
Searching...
No Matches
rewriteheap.c File Reference
#include "postgres.h"
#include <unistd.h>
#include "access/heapam.h"
#include "access/heapam_xlog.h"
#include "access/heaptoast.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xloginsert.h"
#include "common/file_utils.h"
#include "lib/ilist.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/slot.h"
#include "storage/bufmgr.h"
#include "storage/bulk_write.h"
#include "storage/fd.h"
#include "storage/procarray.h"
#include "utils/memutils.h"
#include "utils/rel.h"
Include dependency graph for rewriteheap.c:

Go to the source code of this file.

Data Structures

struct  RewriteStateData
 
struct  TidHashKey
 
struct  UnresolvedTupData
 
struct  OldToNewMappingData
 
struct  RewriteMappingFile
 
struct  RewriteMappingDataEntry
 

Typedefs

typedef struct RewriteStateData RewriteStateData
 
typedef UnresolvedTupDataUnresolvedTup
 
typedef OldToNewMappingDataOldToNewMapping
 
typedef struct RewriteMappingFile RewriteMappingFile
 
typedef struct RewriteMappingDataEntry RewriteMappingDataEntry
 

Functions

static void raw_heap_insert (RewriteState state, HeapTuple tup)
 
static void logical_begin_heap_rewrite (RewriteState state)
 
static void logical_rewrite_heap_tuple (RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
 
static void logical_end_heap_rewrite (RewriteState state)
 
RewriteState begin_heap_rewrite (Relation old_heap, Relation new_heap, TransactionId oldest_xmin, TransactionId freeze_xid, MultiXactId cutoff_multi)
 
void end_heap_rewrite (RewriteState state)
 
void rewrite_heap_tuple (RewriteState state, HeapTuple old_tuple, HeapTuple new_tuple)
 
bool rewrite_heap_dead_tuple (RewriteState state, HeapTuple old_tuple)
 
static void logical_heap_rewrite_flush_mappings (RewriteState state)
 
static void logical_rewrite_log_mapping (RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
 
void heap_xlog_logical_rewrite (XLogReaderState *r)
 
void CheckPointLogicalRewriteHeap (void)
 

Typedef Documentation

◆ OldToNewMapping

◆ RewriteMappingDataEntry

◆ RewriteMappingFile

◆ RewriteStateData

◆ UnresolvedTup

Function Documentation

◆ begin_heap_rewrite()

RewriteState begin_heap_rewrite ( Relation  old_heap,
Relation  new_heap,
TransactionId  oldest_xmin,
TransactionId  freeze_xid,
MultiXactId  cutoff_multi 
)

Definition at line 234 of file rewriteheap.c.

236{
241
242 /*
243 * To ease cleanup, make a separate context that will contain the
244 * RewriteState struct itself plus all subsidiary data.
245 */
247 "Table rewrite",
250
251 /* Create and fill in the state struct */
253
254 state->rs_old_rel = old_heap;
255 state->rs_new_rel = new_heap;
256 state->rs_buffer = NULL;
257 /* new_heap needn't be empty, just locked */
259 state->rs_oldest_xmin = oldest_xmin;
260 state->rs_freeze_xid = freeze_xid;
261 state->rs_cutoff_multi = cutoff_multi;
262 state->rs_cxt = rw_cxt;
264
265 /* Initialize hash tables used to track update chains */
266 hash_ctl.keysize = sizeof(TidHashKey);
267 hash_ctl.entrysize = sizeof(UnresolvedTupData);
268 hash_ctl.hcxt = state->rs_cxt;
269
270 state->rs_unresolved_tups =
271 hash_create("Rewrite / Unresolved ctids",
272 128, /* arbitrary initial size */
273 &hash_ctl,
275
276 hash_ctl.entrysize = sizeof(OldToNewMappingData);
277
278 state->rs_old_new_tid_map =
279 hash_create("Rewrite / Old to new tid map",
280 128, /* arbitrary initial size */
281 &hash_ctl,
283
285
287
288 return state;
289}
#define RelationGetNumberOfBlocks(reln)
Definition bufmgr.h:307
BulkWriteState * smgr_bulk_start_rel(Relation rel, ForkNumber forknum)
Definition bulk_write.c:87
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition dynahash.c:358
#define palloc0_object(type)
Definition fe_memutils.h:75
#define HASH_CONTEXT
Definition hsearch.h:102
#define HASH_ELEM
Definition hsearch.h:95
#define HASH_BLOBS
Definition hsearch.h:97
MemoryContext CurrentMemoryContext
Definition mcxt.c:160
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
static int fb(int x)
@ MAIN_FORKNUM
Definition relpath.h:58
static void logical_begin_heap_rewrite(RewriteState state)

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CurrentMemoryContext, fb(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, logical_begin_heap_rewrite(), MAIN_FORKNUM, MemoryContextSwitchTo(), palloc0_object, RelationGetNumberOfBlocks, and smgr_bulk_start_rel().

Referenced by heapam_relation_copy_for_cluster().

◆ CheckPointLogicalRewriteHeap()

void CheckPointLogicalRewriteHeap ( void  )

Definition at line 1157 of file rewriteheap.c.

1158{
1159 XLogRecPtr cutoff;
1160 XLogRecPtr redo;
1162 struct dirent *mapping_de;
1163 char path[MAXPGPATH + sizeof(PG_LOGICAL_MAPPINGS_DIR)];
1164
1165 /*
1166 * We start of with a minimum of the last redo pointer. No new decoding
1167 * slot will start before that, so that's a safe upper bound for removal.
1168 */
1169 redo = GetRedoRecPtr();
1170
1171 /* now check for the restart ptrs from existing slots */
1173
1174 /* don't start earlier than the restart lsn */
1175 if (XLogRecPtrIsValid(cutoff) && redo < cutoff)
1176 cutoff = redo;
1177
1180 {
1181 Oid dboid;
1182 Oid relid;
1183 XLogRecPtr lsn;
1186 uint32 hi,
1187 lo;
1189
1190 if (strcmp(mapping_de->d_name, ".") == 0 ||
1191 strcmp(mapping_de->d_name, "..") == 0)
1192 continue;
1193
1194 snprintf(path, sizeof(path), "%s/%s", PG_LOGICAL_MAPPINGS_DIR, mapping_de->d_name);
1195 de_type = get_dirent_type(path, mapping_de, false, DEBUG1);
1196
1198 continue;
1199
1200 /* Skip over files that cannot be ours. */
1201 if (strncmp(mapping_de->d_name, "map-", 4) != 0)
1202 continue;
1203
1205 &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
1206 elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
1207
1208 lsn = ((uint64) hi) << 32 | lo;
1209
1210 if (lsn < cutoff || !XLogRecPtrIsValid(cutoff))
1211 {
1212 elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
1213 if (unlink(path) < 0)
1214 ereport(ERROR,
1216 errmsg("could not remove file \"%s\": %m", path)));
1217 }
1218 else
1219 {
1220 /* on some operating systems fsyncing a file requires O_RDWR */
1221 int fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
1222
1223 /*
1224 * The file cannot vanish due to concurrency since this function
1225 * is the only one removing logical mappings and only one
1226 * checkpoint can be in progress at a time.
1227 */
1228 if (fd < 0)
1229 ereport(ERROR,
1231 errmsg("could not open file \"%s\": %m", path)));
1232
1233 /*
1234 * We could try to avoid fsyncing files that either haven't
1235 * changed or have only been created since the checkpoint's start,
1236 * but it's currently not deemed worth the effort.
1237 */
1239 if (pg_fsync(fd) != 0)
1242 errmsg("could not fsync file \"%s\": %m", path)));
1244
1245 if (CloseTransientFile(fd) != 0)
1246 ereport(ERROR,
1248 errmsg("could not close file \"%s\": %m", path)));
1249 }
1250 }
1252
1253 /* persist directory entries to disk */
1255}
#define PG_BINARY
Definition c.h:1287
uint64_t uint64
Definition c.h:547
uint32_t uint32
Definition c.h:546
uint32 TransactionId
Definition c.h:666
int errcode_for_file_access(void)
Definition elog.c:886
int errmsg(const char *fmt,...)
Definition elog.c:1080
#define DEBUG1
Definition elog.h:30
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
int FreeDir(DIR *dir)
Definition fd.c:3005
int CloseTransientFile(int fd)
Definition fd.c:2851
void fsync_fname(const char *fname, bool isdir)
Definition fd.c:753
int data_sync_elevel(int elevel)
Definition fd.c:3982
DIR * AllocateDir(const char *dirname)
Definition fd.c:2887
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition fd.c:2953
int pg_fsync(int fd)
Definition fd.c:386
int OpenTransientFile(const char *fileName, int fileFlags)
Definition fd.c:2674
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
Definition file_utils.c:547
PGFileType
Definition file_utils.h:19
@ PGFILETYPE_REG
Definition file_utils.h:22
@ PGFILETYPE_ERROR
Definition file_utils.h:20
#define MAXPGPATH
#define snprintf
Definition port.h:260
unsigned int Oid
static int fd(const char *x, int i)
#define PG_LOGICAL_MAPPINGS_DIR
#define LOGICAL_REWRITE_FORMAT
Definition rewriteheap.h:54
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition slot.c:1367
Definition dirent.c:26
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition wait_event.h:69
static void pgstat_report_wait_end(void)
Definition wait_event.h:85
XLogRecPtr GetRedoRecPtr(void)
Definition xlog.c:6563
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
uint64 XLogRecPtr
Definition xlogdefs.h:21

References AllocateDir(), CloseTransientFile(), data_sync_elevel(), DEBUG1, elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fb(), fd(), FreeDir(), fsync_fname(), get_dirent_type(), GetRedoRecPtr(), LOGICAL_REWRITE_FORMAT, MAXPGPATH, OpenTransientFile(), PG_BINARY, pg_fsync(), PG_LOGICAL_MAPPINGS_DIR, PGFILETYPE_ERROR, PGFILETYPE_REG, pgstat_report_wait_end(), pgstat_report_wait_start(), ReadDir(), ReplicationSlotsComputeLogicalRestartLSN(), snprintf, and XLogRecPtrIsValid.

Referenced by CheckPointGuts().

◆ end_heap_rewrite()

void end_heap_rewrite ( RewriteState  state)

Definition at line 297 of file rewriteheap.c.

298{
301
302 /*
303 * Write any remaining tuples in the UnresolvedTups table. If we have any
304 * left, they should in fact be dead, but let's err on the safe side.
305 */
306 hash_seq_init(&seq_status, state->rs_unresolved_tups);
307
309 {
310 ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
312 }
313
314 /* Write the last page, if any */
315 if (state->rs_buffer)
316 {
317 smgr_bulk_write(state->rs_bulkstate, state->rs_blockno, state->rs_buffer, true);
318 state->rs_buffer = NULL;
319 }
320
321 smgr_bulk_finish(state->rs_bulkstate);
322
324
325 /* Deleting the context frees everything */
326 MemoryContextDelete(state->rs_cxt);
327}
void smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)
Definition bulk_write.c:323
void smgr_bulk_finish(BulkWriteState *bulkstate)
Definition bulk_write.c:130
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition dynahash.c:1415
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition dynahash.c:1380
static void ItemPointerSetInvalid(ItemPointerData *pointer)
Definition itemptr.h:184
void MemoryContextDelete(MemoryContext context)
Definition mcxt.c:472
static void raw_heap_insert(RewriteState state, HeapTuple tup)
static void logical_end_heap_rewrite(RewriteState state)

References fb(), hash_seq_init(), hash_seq_search(), ItemPointerSetInvalid(), logical_end_heap_rewrite(), MemoryContextDelete(), raw_heap_insert(), smgr_bulk_finish(), and smgr_bulk_write().

Referenced by heapam_relation_copy_for_cluster().

◆ heap_xlog_logical_rewrite()

void heap_xlog_logical_rewrite ( XLogReaderState r)

Definition at line 1075 of file rewriteheap.c.

1076{
1077 char path[MAXPGPATH];
1078 int fd;
1080 uint32 len;
1081 char *data;
1082
1084
1085 snprintf(path, MAXPGPATH,
1087 PG_LOGICAL_MAPPINGS_DIR, xlrec->mapped_db, xlrec->mapped_rel,
1088 LSN_FORMAT_ARGS(xlrec->start_lsn),
1089 xlrec->mapped_xid, XLogRecGetXid(r));
1090
1091 fd = OpenTransientFile(path,
1093 if (fd < 0)
1094 ereport(ERROR,
1096 errmsg("could not create file \"%s\": %m", path)));
1097
1098 /*
1099 * Truncate all data that's not guaranteed to have been safely fsynced (by
1100 * previous record or by the last checkpoint).
1101 */
1103 if (ftruncate(fd, xlrec->offset) != 0)
1104 ereport(ERROR,
1106 errmsg("could not truncate file \"%s\" to %u: %m",
1107 path, (uint32) xlrec->offset)));
1109
1110 data = XLogRecGetData(r) + sizeof(*xlrec);
1111
1112 len = xlrec->num_mappings * sizeof(LogicalRewriteMappingData);
1113
1114 /* write out tail end of mapping file (again) */
1115 errno = 0;
1117 if (pg_pwrite(fd, data, len, xlrec->offset) != len)
1118 {
1119 /* if write didn't set errno, assume problem is no disk space */
1120 if (errno == 0)
1121 errno = ENOSPC;
1122 ereport(ERROR,
1124 errmsg("could not write to file \"%s\": %m", path)));
1125 }
1127
1128 /*
1129 * Now fsync all previously written data. We could improve things and only
1130 * do this for the last write to a file, but the required bookkeeping
1131 * doesn't seem worth the trouble.
1132 */
1134 if (pg_fsync(fd) != 0)
1137 errmsg("could not fsync file \"%s\": %m", path)));
1139
1140 if (CloseTransientFile(fd) != 0)
1141 ereport(ERROR,
1143 errmsg("could not close file \"%s\": %m", path)));
1144}
const void size_t len
const void * data
#define pg_pwrite
Definition port.h:248
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
#define XLogRecGetData(decoder)
Definition xlogreader.h:414
#define XLogRecGetXid(decoder)
Definition xlogreader.h:411

References CloseTransientFile(), data, data_sync_elevel(), ereport, errcode_for_file_access(), errmsg(), ERROR, fb(), fd(), len, LOGICAL_REWRITE_FORMAT, LSN_FORMAT_ARGS, MAXPGPATH, OpenTransientFile(), PG_BINARY, pg_fsync(), PG_LOGICAL_MAPPINGS_DIR, pg_pwrite, pgstat_report_wait_end(), pgstat_report_wait_start(), snprintf, XLogRecGetData, and XLogRecGetXid.

Referenced by heap2_redo().

◆ logical_begin_heap_rewrite()

static void logical_begin_heap_rewrite ( RewriteState  state)
static

Definition at line 761 of file rewriteheap.c.

762{
765
766 /*
767 * We only need to persist these mappings if the rewritten table can be
768 * accessed during logical decoding, if not, we can skip doing any
769 * additional work.
770 */
771 state->rs_logical_rewrite =
773
774 if (!state->rs_logical_rewrite)
775 return;
776
778
779 /*
780 * If there are no logical slots in progress we don't need to do anything,
781 * there cannot be any remappings for relevant rows yet. The relation's
782 * lock protects us against races.
783 */
785 {
786 state->rs_logical_rewrite = false;
787 return;
788 }
789
790 state->rs_logical_xmin = logical_xmin;
791 state->rs_begin_lsn = GetXLogInsertRecPtr();
792 state->rs_num_rewrite_mappings = 0;
793
794 hash_ctl.keysize = sizeof(TransactionId);
795 hash_ctl.entrysize = sizeof(RewriteMappingFile);
796 hash_ctl.hcxt = state->rs_cxt;
797
798 state->rs_logical_mappings =
799 hash_create("Logical rewrite mapping",
800 128, /* arbitrary initial size */
801 &hash_ctl,
803}
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
Definition procarray.c:3947
#define RelationIsAccessibleInLogicalDecoding(relation)
Definition rel.h:693
struct RewriteMappingFile RewriteMappingFile
#define InvalidTransactionId
Definition transam.h:31
XLogRecPtr GetXLogInsertRecPtr(void)
Definition xlog.c:9598

References fb(), GetXLogInsertRecPtr(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, InvalidTransactionId, ProcArrayGetReplicationSlotXmin(), and RelationIsAccessibleInLogicalDecoding.

Referenced by begin_heap_rewrite().

◆ logical_end_heap_rewrite()

static void logical_end_heap_rewrite ( RewriteState  state)
static

Definition at line 907 of file rewriteheap.c.

908{
911
912 /* done, no logical rewrite in progress */
913 if (!state->rs_logical_rewrite)
914 return;
915
916 /* writeout remaining in-memory entries */
917 if (state->rs_num_rewrite_mappings > 0)
919
920 /* Iterate over all mappings we have written and fsync the files. */
921 hash_seq_init(&seq_status, state->rs_logical_mappings);
922 while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
923 {
927 errmsg("could not fsync file \"%s\": %m", src->path)));
928 FileClose(src->vfd);
929 }
930 /* memory context cleanup will deal with the rest */
931}
int FileSync(File file, uint32 wait_event_info)
Definition fd.c:2332
void FileClose(File file)
Definition fd.c:1962
static void logical_heap_rewrite_flush_mappings(RewriteState state)
char path[MAXPGPATH]

References data_sync_elevel(), ereport, errcode_for_file_access(), errmsg(), ERROR, fb(), FileClose(), FileSync(), hash_seq_init(), hash_seq_search(), logical_heap_rewrite_flush_mappings(), RewriteMappingFile::path, and RewriteMappingFile::vfd.

Referenced by end_heap_rewrite().

◆ logical_heap_rewrite_flush_mappings()

static void logical_heap_rewrite_flush_mappings ( RewriteState  state)
static

Definition at line 809 of file rewriteheap.c.

810{
814
815 Assert(state->rs_logical_rewrite);
816
817 /* no logical rewrite in progress, no need to iterate over mappings */
818 if (state->rs_num_rewrite_mappings == 0)
819 return;
820
821 elog(DEBUG1, "flushing %u logical rewrite mapping entries",
822 state->rs_num_rewrite_mappings);
823
824 hash_seq_init(&seq_status, state->rs_logical_mappings);
825 while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
826 {
827 char *waldata;
828 char *waldata_start;
830 Oid dboid;
831 uint32 len;
832 int written;
833 uint32 num_mappings = dclist_count(&src->mappings);
834
835 /* this file hasn't got any new mappings */
836 if (num_mappings == 0)
837 continue;
838
839 if (state->rs_old_rel->rd_rel->relisshared)
840 dboid = InvalidOid;
841 else
842 dboid = MyDatabaseId;
843
844 xlrec.num_mappings = num_mappings;
845 xlrec.mapped_rel = RelationGetRelid(state->rs_old_rel);
846 xlrec.mapped_xid = src->xid;
847 xlrec.mapped_db = dboid;
848 xlrec.offset = src->off;
849 xlrec.start_lsn = state->rs_begin_lsn;
850
851 /* write all mappings consecutively */
852 len = num_mappings * sizeof(LogicalRewriteMappingData);
854
855 /*
856 * collect data we need to write out, but don't modify ondisk data yet
857 */
859 {
861
863
864 memcpy(waldata, &pmap->map, sizeof(pmap->map));
865 waldata += sizeof(pmap->map);
866
867 /* remove from the list and free */
868 dclist_delete_from(&src->mappings, &pmap->node);
869 pfree(pmap);
870
871 /* update bookkeeping */
872 state->rs_num_rewrite_mappings--;
873 }
874
875 Assert(dclist_count(&src->mappings) == 0);
877
878 /*
879 * Note that we deviate from the usual WAL coding practices here,
880 * check the above "Logical rewrite support" comment for reasoning.
881 */
882 written = FileWrite(src->vfd, waldata_start, len, src->off,
884 if (written != len)
887 errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path,
888 written, len)));
889 src->off += len;
890
892 XLogRegisterData(&xlrec, sizeof(xlrec));
894
895 /* write xlog record */
897
899 }
900 Assert(state->rs_num_rewrite_mappings == 0);
901}
#define Assert(condition)
Definition c.h:873
static ssize_t FileWrite(File file, const void *buffer, size_t amount, pgoff_t offset, uint32 wait_event_info)
Definition fd.h:226
Oid MyDatabaseId
Definition globals.c:94
#define XLOG_HEAP2_REWRITE
Definition heapam_xlog.h:59
#define dclist_container(type, membername, ptr)
Definition ilist.h:947
static uint32 dclist_count(const dclist_head *head)
Definition ilist.h:932
static void dclist_delete_from(dclist_head *head, dlist_node *node)
Definition ilist.h:763
#define dclist_foreach_modify(iter, lhead)
Definition ilist.h:973
void pfree(void *pointer)
Definition mcxt.c:1616
void * palloc(Size size)
Definition mcxt.c:1387
#define InvalidOid
#define RelationGetRelid(relation)
Definition rel.h:514
TransactionId xid
dclist_head mappings
dlist_node * cur
Definition ilist.h:200
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition xloginsert.c:478
void XLogRegisterData(const void *data, uint32 len)
Definition xloginsert.c:368
void XLogBeginInsert(void)
Definition xloginsert.c:152

References Assert, dlist_mutable_iter::cur, dclist_container, dclist_count(), dclist_delete_from(), dclist_foreach_modify, DEBUG1, elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fb(), FileWrite(), hash_seq_init(), hash_seq_search(), InvalidOid, len, RewriteMappingFile::mappings, MyDatabaseId, RewriteMappingFile::off, palloc(), RewriteMappingFile::path, pfree(), RelationGetRelid, RewriteMappingFile::vfd, RewriteMappingFile::xid, XLOG_HEAP2_REWRITE, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by logical_end_heap_rewrite(), and logical_rewrite_log_mapping().

◆ logical_rewrite_heap_tuple()

static void logical_rewrite_heap_tuple ( RewriteState  state,
ItemPointerData  old_tid,
HeapTuple  new_tuple 
)
static

Definition at line 1001 of file rewriteheap.c.

1003{
1004 ItemPointerData new_tid = new_tuple->t_self;
1005 TransactionId cutoff = state->rs_logical_xmin;
1006 TransactionId xmin;
1007 TransactionId xmax;
1008 bool do_log_xmin = false;
1009 bool do_log_xmax = false;
1011
1012 /* no logical rewrite in progress, we don't need to log anything */
1013 if (!state->rs_logical_rewrite)
1014 return;
1015
1016 xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
1017 /* use *GetUpdateXid to correctly deal with multixacts */
1018 xmax = HeapTupleHeaderGetUpdateXid(new_tuple->t_data);
1019
1020 /*
1021 * Log the mapping iff the tuple has been created recently.
1022 */
1023 if (TransactionIdIsNormal(xmin) && !TransactionIdPrecedes(xmin, cutoff))
1024 do_log_xmin = true;
1025
1026 if (!TransactionIdIsNormal(xmax))
1027 {
1028 /*
1029 * no xmax is set, can't have any permanent ones, so this check is
1030 * sufficient
1031 */
1032 }
1033 else if (HEAP_XMAX_IS_LOCKED_ONLY(new_tuple->t_data->t_infomask))
1034 {
1035 /* only locked, we don't care */
1036 }
1037 else if (!TransactionIdPrecedes(xmax, cutoff))
1038 {
1039 /* tuple has been deleted recently, log */
1040 do_log_xmax = true;
1041 }
1042
1043 /* if neither needs to be logged, we're done */
1044 if (!do_log_xmin && !do_log_xmax)
1045 return;
1046
1047 /* fill out mapping information */
1048 map.old_locator = state->rs_old_rel->rd_locator;
1049 map.old_tid = old_tid;
1050 map.new_locator = state->rs_new_rel->rd_locator;
1051 map.new_tid = new_tid;
1052
1053 /* ---
1054 * Now persist the mapping for the individual xids that are affected. We
1055 * need to log for both xmin and xmax if they aren't the same transaction
1056 * since the mapping files are per "affected" xid.
1057 * We don't muster all that much effort detecting whether xmin and xmax
1058 * are actually the same transaction, we just check whether the xid is the
1059 * same disregarding subtransactions. Logging too much is relatively
1060 * harmless and we could never do the check fully since subtransaction
1061 * data is thrown away during restarts.
1062 * ---
1063 */
1064 if (do_log_xmin)
1066 /* separately log mapping for xmax unless it'd be redundant */
1067 if (do_log_xmax && !TransactionIdEquals(xmin, xmax))
1069}
static bool HEAP_XMAX_IS_LOCKED_ONLY(uint16 infomask)
static TransactionId HeapTupleHeaderGetXmin(const HeapTupleHeaderData *tup)
static TransactionId HeapTupleHeaderGetUpdateXid(const HeapTupleHeaderData *tup)
static void logical_rewrite_log_mapping(RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
ItemPointerData new_tid
Definition rewriteheap.h:40
RelFileLocator old_locator
Definition rewriteheap.h:37
ItemPointerData old_tid
Definition rewriteheap.h:39
RelFileLocator new_locator
Definition rewriteheap.h:38
#define TransactionIdEquals(id1, id2)
Definition transam.h:43
#define TransactionIdIsNormal(xid)
Definition transam.h:42
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263

References fb(), HEAP_XMAX_IS_LOCKED_ONLY(), HeapTupleHeaderGetUpdateXid(), HeapTupleHeaderGetXmin(), logical_rewrite_log_mapping(), LogicalRewriteMappingData::new_locator, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_locator, LogicalRewriteMappingData::old_tid, TransactionIdEquals, TransactionIdIsNormal, and TransactionIdPrecedes().

Referenced by rewrite_heap_tuple().

◆ logical_rewrite_log_mapping()

static void logical_rewrite_log_mapping ( RewriteState  state,
TransactionId  xid,
LogicalRewriteMappingData map 
)
static

Definition at line 937 of file rewriteheap.c.

939{
942 Oid relid;
943 bool found;
944
945 relid = RelationGetRelid(state->rs_old_rel);
946
947 /* look for existing mappings for this 'mapped' xid */
948 src = hash_search(state->rs_logical_mappings, &xid,
949 HASH_ENTER, &found);
950
951 /*
952 * We haven't yet had the need to map anything for this xid, create
953 * per-xid data structures.
954 */
955 if (!found)
956 {
957 char path[MAXPGPATH];
958 Oid dboid;
959
960 if (state->rs_old_rel->rd_rel->relisshared)
961 dboid = InvalidOid;
962 else
963 dboid = MyDatabaseId;
964
965 snprintf(path, MAXPGPATH,
967 PG_LOGICAL_MAPPINGS_DIR, dboid, relid,
968 LSN_FORMAT_ARGS(state->rs_begin_lsn),
970
971 dclist_init(&src->mappings);
972 src->off = 0;
973 memcpy(src->path, path, sizeof(path));
974 src->vfd = PathNameOpenFile(path,
976 if (src->vfd < 0)
979 errmsg("could not create file \"%s\": %m", path)));
980 }
981
982 pmap = MemoryContextAlloc(state->rs_cxt,
984 memcpy(&pmap->map, map, sizeof(LogicalRewriteMappingData));
985 dclist_push_tail(&src->mappings, &pmap->node);
986 state->rs_num_rewrite_mappings++;
987
988 /*
989 * Write out buffer every time we've too many in-memory entries across all
990 * mapping files.
991 */
992 if (state->rs_num_rewrite_mappings >= 1000 /* arbitrary number */ )
994}
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition dynahash.c:952
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition fd.c:1559
@ HASH_ENTER
Definition hsearch.h:114
static void dclist_push_tail(dclist_head *head, dlist_node *node)
Definition ilist.h:709
static void dclist_init(dclist_head *head)
Definition ilist.h:671
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition mcxt.c:1232
TransactionId GetCurrentTransactionId(void)
Definition xact.c:455

References dclist_init(), dclist_push_tail(), ereport, errcode_for_file_access(), errmsg(), ERROR, fb(), GetCurrentTransactionId(), HASH_ENTER, hash_search(), InvalidOid, logical_heap_rewrite_flush_mappings(), LOGICAL_REWRITE_FORMAT, LSN_FORMAT_ARGS, RewriteMappingFile::mappings, MAXPGPATH, MemoryContextAlloc(), MyDatabaseId, RewriteMappingFile::off, RewriteMappingFile::path, PathNameOpenFile(), PG_BINARY, PG_LOGICAL_MAPPINGS_DIR, RelationGetRelid, snprintf, and RewriteMappingFile::vfd.

Referenced by logical_rewrite_heap_tuple().

◆ raw_heap_insert()

static void raw_heap_insert ( RewriteState  state,
HeapTuple  tup 
)
static

Definition at line 596 of file rewriteheap.c.

597{
598 Page page;
601 Size len;
604
605 /*
606 * If the new tuple is too big for storage or contains already toasted
607 * out-of-line attributes from some other relation, invoke the toaster.
608 *
609 * Note: below this point, heaptup is the data we actually intend to store
610 * into the relation; tup is the caller's original untoasted data.
611 */
612 if (state->rs_new_rel->rd_rel->relkind == RELKIND_TOASTVALUE)
613 {
614 /* toast table entries should never be recursively toasted */
616 heaptup = tup;
617 }
619 {
621
622 /*
623 * While rewriting the heap for VACUUM FULL / CLUSTER, make sure data
624 * for the TOAST table are not logically decoded. The main heap is
625 * WAL-logged as XLOG FPI records, which are not logically decoded.
626 */
628
630 options);
631 }
632 else
633 heaptup = tup;
634
635 len = MAXALIGN(heaptup->t_len); /* be conservative */
636
637 /*
638 * If we're gonna fail for oversize tuple, do it right away
639 */
640 if (len > MaxHeapTupleSize)
643 errmsg("row is too big: size %zu, maximum size %zu",
645
646 /* Compute desired extra freespace due to fillfactor option */
649
650 /* Now we can check to see if there's enough free space already. */
651 page = (Page) state->rs_buffer;
652 if (page)
653 {
655
657 {
658 /*
659 * Doesn't fit, so write out the existing page. It always
660 * contains a tuple. Hence, unlike RelationGetBufferForTuple(),
661 * enforce saveFreeSpace unconditionally.
662 */
663 smgr_bulk_write(state->rs_bulkstate, state->rs_blockno, state->rs_buffer, true);
664 state->rs_buffer = NULL;
665 page = NULL;
666 state->rs_blockno++;
667 }
668 }
669
670 if (!page)
671 {
672 /* Initialize a new empty page */
673 state->rs_buffer = smgr_bulk_get_buf(state->rs_bulkstate);
674 page = (Page) state->rs_buffer;
675 PageInit(page, BLCKSZ, 0);
676 }
677
678 /* And now we can insert the tuple into the page */
679 newoff = PageAddItem(page, heaptup->t_data, heaptup->t_len, InvalidOffsetNumber, false, true);
681 elog(ERROR, "failed to add tuple");
682
683 /* Update caller's t_self to the actual position where it was stored */
684 ItemPointerSet(&(tup->t_self), state->rs_blockno, newoff);
685
686 /*
687 * Insert the correct position into CTID of the stored tuple, too, if the
688 * caller didn't supply a valid CTID.
689 */
690 if (!ItemPointerIsValid(&tup->t_data->t_ctid))
691 {
694
697
698 onpage_tup->t_ctid = tup->t_self;
699 }
700
701 /* If heaptup is a private copy, release it. */
702 if (heaptup != tup)
704}
Size PageGetHeapFreeSpace(const PageData *page)
Definition bufpage.c:990
void PageInit(Page page, Size pageSize, Size specialSize)
Definition bufpage.c:42
static ItemId PageGetItemId(Page page, OffsetNumber offsetNumber)
Definition bufpage.h:243
static void * PageGetItem(PageData *page, const ItemIdData *itemId)
Definition bufpage.h:353
PageData * Page
Definition bufpage.h:81
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition bufpage.h:471
BulkWriteBuffer smgr_bulk_get_buf(BulkWriteState *bulkstate)
Definition bulk_write.c:347
#define MAXALIGN(LEN)
Definition c.h:826
size_t Size
Definition c.h:619
int errcode(int sqlerrcode)
Definition elog.c:863
#define HEAP_INSERT_SKIP_FSM
Definition heapam.h:37
#define HEAP_INSERT_NO_LOGICAL
Definition heapam.h:39
HeapTuple heap_toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, int options)
Definition heaptoast.c:96
#define TOAST_TUPLE_THRESHOLD
Definition heaptoast.h:48
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1435
HeapTupleHeaderData * HeapTupleHeader
Definition htup.h:23
static bool HeapTupleHasExternal(const HeapTupleData *tuple)
#define MaxHeapTupleSize
static void ItemPointerSet(ItemPointerData *pointer, BlockNumber blockNumber, OffsetNumber offNum)
Definition itemptr.h:135
static bool ItemPointerIsValid(const ItemPointerData *pointer)
Definition itemptr.h:83
#define InvalidOffsetNumber
Definition off.h:26
uint16 OffsetNumber
Definition off.h:24
#define RelationGetTargetPageFreeSpace(relation, defaultff)
Definition rel.h:389
#define HEAP_DEFAULT_FILLFACTOR
Definition rel.h:360

References Assert, elog, ereport, errcode(), errmsg(), ERROR, fb(), HEAP_DEFAULT_FILLFACTOR, heap_freetuple(), HEAP_INSERT_NO_LOGICAL, HEAP_INSERT_SKIP_FSM, heap_toast_insert_or_update(), HeapTupleHasExternal(), InvalidOffsetNumber, ItemPointerIsValid(), ItemPointerSet(), len, MAXALIGN, MaxHeapTupleSize, PageAddItem, PageGetHeapFreeSpace(), PageGetItem(), PageGetItemId(), PageInit(), RelationGetTargetPageFreeSpace, smgr_bulk_get_buf(), smgr_bulk_write(), and TOAST_TUPLE_THRESHOLD.

Referenced by end_heap_rewrite(), and rewrite_heap_tuple().

◆ rewrite_heap_dead_tuple()

bool rewrite_heap_dead_tuple ( RewriteState  state,
HeapTuple  old_tuple 
)

Definition at line 546 of file rewriteheap.c.

547{
548 /*
549 * If we have already seen an earlier tuple in the update chain that
550 * points to this tuple, let's forget about that earlier tuple. It's in
551 * fact dead as well, our simple xmax < OldestXmin test in
552 * HeapTupleSatisfiesVacuum just wasn't enough to detect it. It happens
553 * when xmin of a tuple is greater than xmax, which sounds
554 * counter-intuitive but is perfectly valid.
555 *
556 * We don't bother to try to detect the situation the other way round,
557 * when we encounter the dead tuple first and then the recently dead one
558 * that points to it. If that happens, we'll have some unmatched entries
559 * in the UnresolvedTups hash table at the end. That can happen anyway,
560 * because a vacuum might have removed the dead tuple in the chain before
561 * us.
562 */
565 bool found;
566
567 memset(&hashkey, 0, sizeof(hashkey));
569 hashkey.tid = old_tuple->t_self;
570
571 unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
572 HASH_FIND, NULL);
573
574 if (unresolved != NULL)
575 {
576 /* Need to free the contained tuple as well as the hashtable entry */
578 hash_search(state->rs_unresolved_tups, &hashkey,
579 HASH_REMOVE, &found);
580 Assert(found);
581 return true;
582 }
583
584 return false;
585}
@ HASH_FIND
Definition hsearch.h:113
@ HASH_REMOVE
Definition hsearch.h:115

References Assert, fb(), HASH_FIND, HASH_REMOVE, hash_search(), heap_freetuple(), and HeapTupleHeaderGetXmin().

Referenced by heapam_relation_copy_for_cluster().

◆ rewrite_heap_tuple()

void rewrite_heap_tuple ( RewriteState  state,
HeapTuple  old_tuple,
HeapTuple  new_tuple 
)

Definition at line 341 of file rewriteheap.c.

343{
345 ItemPointerData old_tid;
347 bool found;
348 bool free_new;
349
351
352 /*
353 * Copy the original tuple's visibility information into new_tuple.
354 *
355 * XXX we might later need to copy some t_infomask2 bits, too? Right now,
356 * we intentionally clear the HOT status bits.
357 */
358 memcpy(&new_tuple->t_data->t_choice.t_heap,
359 &old_tuple->t_data->t_choice.t_heap,
360 sizeof(HeapTupleFields));
361
362 new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
363 new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
364 new_tuple->t_data->t_infomask |=
365 old_tuple->t_data->t_infomask & HEAP_XACT_MASK;
366
367 /*
368 * While we have our hands on the tuple, we may as well freeze any
369 * eligible xmin or xmax, so that future VACUUM effort can be saved.
370 */
372 state->rs_old_rel->rd_rel->relfrozenxid,
373 state->rs_old_rel->rd_rel->relminmxid,
374 state->rs_freeze_xid,
375 state->rs_cutoff_multi);
376
377 /*
378 * Invalid ctid means that ctid should point to the tuple itself. We'll
379 * override it later if the tuple is part of an update chain.
380 */
381 ItemPointerSetInvalid(&new_tuple->t_data->t_ctid);
382
383 /*
384 * If the tuple has been updated, check the old-to-new mapping hash table.
385 *
386 * Note that this check relies on the HeapTupleSatisfiesVacuum() in
387 * heapam_relation_copy_for_cluster() to have set hint bits.
388 */
389 if (!((old_tuple->t_data->t_infomask & HEAP_XMAX_INVALID) ||
392 !(ItemPointerEquals(&(old_tuple->t_self),
393 &(old_tuple->t_data->t_ctid))))
394 {
396
397 memset(&hashkey, 0, sizeof(hashkey));
399 hashkey.tid = old_tuple->t_data->t_ctid;
400
402 hash_search(state->rs_old_new_tid_map, &hashkey,
403 HASH_FIND, NULL);
404
405 if (mapping != NULL)
406 {
407 /*
408 * We've already copied the tuple that t_ctid points to, so we can
409 * set the ctid of this tuple to point to the new location, and
410 * insert it right away.
411 */
412 new_tuple->t_data->t_ctid = mapping->new_tid;
413
414 /* We don't need the mapping entry anymore */
415 hash_search(state->rs_old_new_tid_map, &hashkey,
416 HASH_REMOVE, &found);
417 Assert(found);
418 }
419 else
420 {
421 /*
422 * We haven't seen the tuple t_ctid points to yet. Stash this
423 * tuple into unresolved_tups to be written later.
424 */
426
427 unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
428 HASH_ENTER, &found);
429 Assert(!found);
430
431 unresolved->old_tid = old_tuple->t_self;
433
434 /*
435 * We can't do anything more now, since we don't know where the
436 * tuple will be written.
437 */
439 return;
440 }
441 }
442
443 /*
444 * Now we will write the tuple, and then check to see if it is the B tuple
445 * in any new or known pair. When we resolve a known pair, we will be
446 * able to write that pair's A tuple, and then we have to check if it
447 * resolves some other pair. Hence, we need a loop here.
448 */
449 old_tid = old_tuple->t_self;
450 free_new = false;
451
452 for (;;)
453 {
454 ItemPointerData new_tid;
455
456 /* Insert the tuple and find out where it's put in new_heap */
458 new_tid = new_tuple->t_self;
459
461
462 /*
463 * If the tuple is the updated version of a row, and the prior version
464 * wouldn't be DEAD yet, then we need to either resolve the prior
465 * version (if it's waiting in rs_unresolved_tups), or make an entry
466 * in rs_old_new_tid_map (so we can resolve it when we do see it). The
467 * previous tuple's xmax would equal this one's xmin, so it's
468 * RECENTLY_DEAD if and only if the xmin is not before OldestXmin.
469 */
470 if ((new_tuple->t_data->t_infomask & HEAP_UPDATED) &&
472 state->rs_oldest_xmin))
473 {
474 /*
475 * Okay, this is B in an update pair. See if we've seen A.
476 */
478
479 memset(&hashkey, 0, sizeof(hashkey));
481 hashkey.tid = old_tid;
482
483 unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
484 HASH_FIND, NULL);
485
486 if (unresolved != NULL)
487 {
488 /*
489 * We have seen and memorized the previous tuple already. Now
490 * that we know where we inserted the tuple its t_ctid points
491 * to, fix its t_ctid and insert it to the new heap.
492 */
493 if (free_new)
495 new_tuple = unresolved->tuple;
496 free_new = true;
497 old_tid = unresolved->old_tid;
498 new_tuple->t_data->t_ctid = new_tid;
499
500 /*
501 * We don't need the hash entry anymore, but don't free its
502 * tuple just yet.
503 */
504 hash_search(state->rs_unresolved_tups, &hashkey,
505 HASH_REMOVE, &found);
506 Assert(found);
507
508 /* loop back to insert the previous tuple in the chain */
509 continue;
510 }
511 else
512 {
513 /*
514 * Remember the new tid of this tuple. We'll use it to set the
515 * ctid when we find the previous tuple in the chain.
516 */
518
519 mapping = hash_search(state->rs_old_new_tid_map, &hashkey,
520 HASH_ENTER, &found);
521 Assert(!found);
522
523 mapping->new_tid = new_tid;
524 }
525 }
526
527 /* Done with this (chain of) tuples, for now */
528 if (free_new)
530 break;
531 }
532
534}
bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId relfrozenxid, TransactionId relminmxid, TransactionId FreezeLimit, TransactionId MultiXactCutoff)
Definition heapam.c:7482
bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple)
HeapTuple heap_copytuple(HeapTuple tuple)
Definition heaptuple.c:778
#define HEAP_XACT_MASK
static bool HeapTupleHeaderIndicatesMovedPartitions(const HeapTupleHeaderData *tup)
#define HEAP_XMAX_INVALID
#define HEAP_UPDATED
bool ItemPointerEquals(const ItemPointerData *pointer1, const ItemPointerData *pointer2)
Definition itemptr.c:35
static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
OldToNewMappingData * OldToNewMapping

References Assert, fb(), HASH_ENTER, HASH_FIND, HASH_REMOVE, hash_search(), heap_copytuple(), heap_freetuple(), heap_freeze_tuple(), HEAP_UPDATED, HEAP_XACT_MASK, HEAP_XMAX_INVALID, HeapTupleHeaderGetUpdateXid(), HeapTupleHeaderGetXmin(), HeapTupleHeaderIndicatesMovedPartitions(), HeapTupleHeaderIsOnlyLocked(), ItemPointerEquals(), ItemPointerSetInvalid(), logical_rewrite_heap_tuple(), MemoryContextSwitchTo(), raw_heap_insert(), and TransactionIdPrecedes().

Referenced by reform_and_rewrite_tuple().