PostgreSQL Source Code git master
Loading...
Searching...
No Matches
rewriteheap.c File Reference
#include "postgres.h"
#include <unistd.h>
#include "access/heapam.h"
#include "access/heapam_xlog.h"
#include "access/heaptoast.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xloginsert.h"
#include "common/file_utils.h"
#include "lib/ilist.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/slot.h"
#include "storage/bufmgr.h"
#include "storage/bulk_write.h"
#include "storage/fd.h"
#include "storage/procarray.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/wait_event.h"
Include dependency graph for rewriteheap.c:

Go to the source code of this file.

Data Structures

struct  RewriteStateData
 
struct  TidHashKey
 
struct  UnresolvedTupData
 
struct  OldToNewMappingData
 
struct  RewriteMappingFile
 
struct  RewriteMappingDataEntry
 

Typedefs

typedef struct RewriteStateData RewriteStateData
 
typedef UnresolvedTupDataUnresolvedTup
 
typedef OldToNewMappingDataOldToNewMapping
 
typedef struct RewriteMappingFile RewriteMappingFile
 
typedef struct RewriteMappingDataEntry RewriteMappingDataEntry
 

Functions

static void raw_heap_insert (RewriteState state, HeapTuple tup)
 
static void logical_begin_heap_rewrite (RewriteState state)
 
static void logical_rewrite_heap_tuple (RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
 
static void logical_end_heap_rewrite (RewriteState state)
 
RewriteState begin_heap_rewrite (Relation old_heap, Relation new_heap, TransactionId oldest_xmin, TransactionId freeze_xid, MultiXactId cutoff_multi)
 
void end_heap_rewrite (RewriteState state)
 
void rewrite_heap_tuple (RewriteState state, HeapTuple old_tuple, HeapTuple new_tuple)
 
bool rewrite_heap_dead_tuple (RewriteState state, HeapTuple old_tuple)
 
static void logical_heap_rewrite_flush_mappings (RewriteState state)
 
static void logical_rewrite_log_mapping (RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
 
void heap_xlog_logical_rewrite (XLogReaderState *r)
 
void CheckPointLogicalRewriteHeap (void)
 

Typedef Documentation

◆ OldToNewMapping

◆ RewriteMappingDataEntry

◆ RewriteMappingFile

◆ RewriteStateData

◆ UnresolvedTup

Function Documentation

◆ begin_heap_rewrite()

RewriteState begin_heap_rewrite ( Relation  old_heap,
Relation  new_heap,
TransactionId  oldest_xmin,
TransactionId  freeze_xid,
MultiXactId  cutoff_multi 
)

Definition at line 235 of file rewriteheap.c.

237{
242
243 /*
244 * To ease cleanup, make a separate context that will contain the
245 * RewriteState struct itself plus all subsidiary data.
246 */
248 "Table rewrite",
251
252 /* Create and fill in the state struct */
254
255 state->rs_old_rel = old_heap;
256 state->rs_new_rel = new_heap;
257 state->rs_buffer = NULL;
258 /* new_heap needn't be empty, just locked */
260 state->rs_oldest_xmin = oldest_xmin;
261 state->rs_freeze_xid = freeze_xid;
262 state->rs_cutoff_multi = cutoff_multi;
263 state->rs_cxt = rw_cxt;
265
266 /* Initialize hash tables used to track update chains */
267 hash_ctl.keysize = sizeof(TidHashKey);
268 hash_ctl.entrysize = sizeof(UnresolvedTupData);
269 hash_ctl.hcxt = state->rs_cxt;
270
271 state->rs_unresolved_tups =
272 hash_create("Rewrite / Unresolved ctids",
273 128, /* arbitrary initial size */
274 &hash_ctl,
276
277 hash_ctl.entrysize = sizeof(OldToNewMappingData);
278
279 state->rs_old_new_tid_map =
280 hash_create("Rewrite / Old to new tid map",
281 128, /* arbitrary initial size */
282 &hash_ctl,
284
286
288
289 return state;
290}
#define RelationGetNumberOfBlocks(reln)
Definition bufmgr.h:307
BulkWriteState * smgr_bulk_start_rel(Relation rel, ForkNumber forknum)
Definition bulk_write.c:87
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition dynahash.c:358
#define palloc0_object(type)
Definition fe_memutils.h:75
#define HASH_CONTEXT
Definition hsearch.h:102
#define HASH_ELEM
Definition hsearch.h:95
#define HASH_BLOBS
Definition hsearch.h:97
MemoryContext CurrentMemoryContext
Definition mcxt.c:160
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
static int fb(int x)
@ MAIN_FORKNUM
Definition relpath.h:58
static void logical_begin_heap_rewrite(RewriteState state)

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CurrentMemoryContext, fb(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, logical_begin_heap_rewrite(), MAIN_FORKNUM, MemoryContextSwitchTo(), palloc0_object, RelationGetNumberOfBlocks, and smgr_bulk_start_rel().

Referenced by heapam_relation_copy_for_cluster().

◆ CheckPointLogicalRewriteHeap()

void CheckPointLogicalRewriteHeap ( void  )

Definition at line 1158 of file rewriteheap.c.

1159{
1160 XLogRecPtr cutoff;
1161 XLogRecPtr redo;
1163 struct dirent *mapping_de;
1164 char path[MAXPGPATH + sizeof(PG_LOGICAL_MAPPINGS_DIR)];
1165
1166 /*
1167 * We start of with a minimum of the last redo pointer. No new decoding
1168 * slot will start before that, so that's a safe upper bound for removal.
1169 */
1170 redo = GetRedoRecPtr();
1171
1172 /* now check for the restart ptrs from existing slots */
1174
1175 /* don't start earlier than the restart lsn */
1176 if (XLogRecPtrIsValid(cutoff) && redo < cutoff)
1177 cutoff = redo;
1178
1181 {
1182 Oid dboid;
1183 Oid relid;
1184 XLogRecPtr lsn;
1187 uint32 hi,
1188 lo;
1190
1191 if (strcmp(mapping_de->d_name, ".") == 0 ||
1192 strcmp(mapping_de->d_name, "..") == 0)
1193 continue;
1194
1195 snprintf(path, sizeof(path), "%s/%s", PG_LOGICAL_MAPPINGS_DIR, mapping_de->d_name);
1196 de_type = get_dirent_type(path, mapping_de, false, DEBUG1);
1197
1199 continue;
1200
1201 /* Skip over files that cannot be ours. */
1202 if (strncmp(mapping_de->d_name, "map-", 4) != 0)
1203 continue;
1204
1206 &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
1207 elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
1208
1209 lsn = ((uint64) hi) << 32 | lo;
1210
1211 if (lsn < cutoff || !XLogRecPtrIsValid(cutoff))
1212 {
1213 elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
1214 if (unlink(path) < 0)
1215 ereport(ERROR,
1217 errmsg("could not remove file \"%s\": %m", path)));
1218 }
1219 else
1220 {
1221 /* on some operating systems fsyncing a file requires O_RDWR */
1222 int fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
1223
1224 /*
1225 * The file cannot vanish due to concurrency since this function
1226 * is the only one removing logical mappings and only one
1227 * checkpoint can be in progress at a time.
1228 */
1229 if (fd < 0)
1230 ereport(ERROR,
1232 errmsg("could not open file \"%s\": %m", path)));
1233
1234 /*
1235 * We could try to avoid fsyncing files that either haven't
1236 * changed or have only been created since the checkpoint's start,
1237 * but it's currently not deemed worth the effort.
1238 */
1240 if (pg_fsync(fd) != 0)
1243 errmsg("could not fsync file \"%s\": %m", path)));
1245
1246 if (CloseTransientFile(fd) != 0)
1247 ereport(ERROR,
1249 errmsg("could not close file \"%s\": %m", path)));
1250 }
1251 }
1253
1254 /* persist directory entries to disk */
1256}
#define PG_BINARY
Definition c.h:1376
uint64_t uint64
Definition c.h:619
uint32_t uint32
Definition c.h:618
uint32 TransactionId
Definition c.h:738
int errcode_for_file_access(void)
Definition elog.c:897
#define DEBUG1
Definition elog.h:30
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
int FreeDir(DIR *dir)
Definition fd.c:3009
int CloseTransientFile(int fd)
Definition fd.c:2855
void fsync_fname(const char *fname, bool isdir)
Definition fd.c:757
int data_sync_elevel(int elevel)
Definition fd.c:3986
DIR * AllocateDir(const char *dirname)
Definition fd.c:2891
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition fd.c:2957
int pg_fsync(int fd)
Definition fd.c:390
int OpenTransientFile(const char *fileName, int fileFlags)
Definition fd.c:2678
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
Definition file_utils.c:547
PGFileType
Definition file_utils.h:19
@ PGFILETYPE_REG
Definition file_utils.h:22
@ PGFILETYPE_ERROR
Definition file_utils.h:20
static char * errmsg
#define MAXPGPATH
#define snprintf
Definition port.h:260
unsigned int Oid
static int fd(const char *x, int i)
#define PG_LOGICAL_MAPPINGS_DIR
#define LOGICAL_REWRITE_FORMAT
Definition rewriteheap.h:54
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition slot.c:1371
Definition dirent.c:26
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition wait_event.h:69
static void pgstat_report_wait_end(void)
Definition wait_event.h:85
XLogRecPtr GetRedoRecPtr(void)
Definition xlog.c:6547
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
uint64 XLogRecPtr
Definition xlogdefs.h:21

References AllocateDir(), CloseTransientFile(), data_sync_elevel(), DEBUG1, elog, ereport, errcode_for_file_access(), errmsg, ERROR, fb(), fd(), FreeDir(), fsync_fname(), get_dirent_type(), GetRedoRecPtr(), LOGICAL_REWRITE_FORMAT, MAXPGPATH, OpenTransientFile(), PG_BINARY, pg_fsync(), PG_LOGICAL_MAPPINGS_DIR, PGFILETYPE_ERROR, PGFILETYPE_REG, pgstat_report_wait_end(), pgstat_report_wait_start(), ReadDir(), ReplicationSlotsComputeLogicalRestartLSN(), snprintf, and XLogRecPtrIsValid.

Referenced by CheckPointGuts().

◆ end_heap_rewrite()

void end_heap_rewrite ( RewriteState  state)

Definition at line 298 of file rewriteheap.c.

299{
302
303 /*
304 * Write any remaining tuples in the UnresolvedTups table. If we have any
305 * left, they should in fact be dead, but let's err on the safe side.
306 */
307 hash_seq_init(&seq_status, state->rs_unresolved_tups);
308
310 {
311 ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
313 }
314
315 /* Write the last page, if any */
316 if (state->rs_buffer)
317 {
318 smgr_bulk_write(state->rs_bulkstate, state->rs_blockno, state->rs_buffer, true);
319 state->rs_buffer = NULL;
320 }
321
322 smgr_bulk_finish(state->rs_bulkstate);
323
325
326 /* Deleting the context frees everything */
327 MemoryContextDelete(state->rs_cxt);
328}
void smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)
Definition bulk_write.c:323
void smgr_bulk_finish(BulkWriteState *bulkstate)
Definition bulk_write.c:130
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition dynahash.c:1415
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition dynahash.c:1380
static void ItemPointerSetInvalid(ItemPointerData *pointer)
Definition itemptr.h:184
void MemoryContextDelete(MemoryContext context)
Definition mcxt.c:472
static void raw_heap_insert(RewriteState state, HeapTuple tup)
static void logical_end_heap_rewrite(RewriteState state)

References fb(), hash_seq_init(), hash_seq_search(), ItemPointerSetInvalid(), logical_end_heap_rewrite(), MemoryContextDelete(), raw_heap_insert(), smgr_bulk_finish(), and smgr_bulk_write().

Referenced by heapam_relation_copy_for_cluster().

◆ heap_xlog_logical_rewrite()

void heap_xlog_logical_rewrite ( XLogReaderState r)

Definition at line 1076 of file rewriteheap.c.

1077{
1078 char path[MAXPGPATH];
1079 int fd;
1081 uint32 len;
1082 char *data;
1083
1085
1086 snprintf(path, MAXPGPATH,
1088 PG_LOGICAL_MAPPINGS_DIR, xlrec->mapped_db, xlrec->mapped_rel,
1089 LSN_FORMAT_ARGS(xlrec->start_lsn),
1090 xlrec->mapped_xid, XLogRecGetXid(r));
1091
1092 fd = OpenTransientFile(path,
1094 if (fd < 0)
1095 ereport(ERROR,
1097 errmsg("could not create file \"%s\": %m", path)));
1098
1099 /*
1100 * Truncate all data that's not guaranteed to have been safely fsynced (by
1101 * previous record or by the last checkpoint).
1102 */
1104 if (ftruncate(fd, xlrec->offset) != 0)
1105 ereport(ERROR,
1107 errmsg("could not truncate file \"%s\" to %u: %m",
1108 path, (uint32) xlrec->offset)));
1110
1111 data = XLogRecGetData(r) + sizeof(*xlrec);
1112
1113 len = xlrec->num_mappings * sizeof(LogicalRewriteMappingData);
1114
1115 /* write out tail end of mapping file (again) */
1116 errno = 0;
1118 if (pg_pwrite(fd, data, len, xlrec->offset) != len)
1119 {
1120 /* if write didn't set errno, assume problem is no disk space */
1121 if (errno == 0)
1122 errno = ENOSPC;
1123 ereport(ERROR,
1125 errmsg("could not write to file \"%s\": %m", path)));
1126 }
1128
1129 /*
1130 * Now fsync all previously written data. We could improve things and only
1131 * do this for the last write to a file, but the required bookkeeping
1132 * doesn't seem worth the trouble.
1133 */
1135 if (pg_fsync(fd) != 0)
1138 errmsg("could not fsync file \"%s\": %m", path)));
1140
1141 if (CloseTransientFile(fd) != 0)
1142 ereport(ERROR,
1144 errmsg("could not close file \"%s\": %m", path)));
1145}
const void size_t len
const void * data
#define pg_pwrite
Definition port.h:248
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
#define XLogRecGetData(decoder)
Definition xlogreader.h:414
#define XLogRecGetXid(decoder)
Definition xlogreader.h:411

References CloseTransientFile(), data, data_sync_elevel(), ereport, errcode_for_file_access(), errmsg, ERROR, fb(), fd(), len, LOGICAL_REWRITE_FORMAT, LSN_FORMAT_ARGS, MAXPGPATH, OpenTransientFile(), PG_BINARY, pg_fsync(), PG_LOGICAL_MAPPINGS_DIR, pg_pwrite, pgstat_report_wait_end(), pgstat_report_wait_start(), snprintf, XLogRecGetData, and XLogRecGetXid.

Referenced by heap2_redo().

◆ logical_begin_heap_rewrite()

static void logical_begin_heap_rewrite ( RewriteState  state)
static

Definition at line 762 of file rewriteheap.c.

763{
766
767 /*
768 * We only need to persist these mappings if the rewritten table can be
769 * accessed during logical decoding, if not, we can skip doing any
770 * additional work.
771 */
772 state->rs_logical_rewrite =
774
775 if (!state->rs_logical_rewrite)
776 return;
777
779
780 /*
781 * If there are no logical slots in progress we don't need to do anything,
782 * there cannot be any remappings for relevant rows yet. The relation's
783 * lock protects us against races.
784 */
786 {
787 state->rs_logical_rewrite = false;
788 return;
789 }
790
791 state->rs_logical_xmin = logical_xmin;
792 state->rs_begin_lsn = GetXLogInsertRecPtr();
793 state->rs_num_rewrite_mappings = 0;
794
795 hash_ctl.keysize = sizeof(TransactionId);
796 hash_ctl.entrysize = sizeof(RewriteMappingFile);
797 hash_ctl.hcxt = state->rs_cxt;
798
799 state->rs_logical_mappings =
800 hash_create("Logical rewrite mapping",
801 128, /* arbitrary initial size */
802 &hash_ctl,
804}
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
Definition procarray.c:3975
#define RelationIsAccessibleInLogicalDecoding(relation)
Definition rel.h:693
struct RewriteMappingFile RewriteMappingFile
#define InvalidTransactionId
Definition transam.h:31
XLogRecPtr GetXLogInsertRecPtr(void)
Definition xlog.c:9614

References fb(), GetXLogInsertRecPtr(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, InvalidTransactionId, ProcArrayGetReplicationSlotXmin(), and RelationIsAccessibleInLogicalDecoding.

Referenced by begin_heap_rewrite().

◆ logical_end_heap_rewrite()

static void logical_end_heap_rewrite ( RewriteState  state)
static

Definition at line 908 of file rewriteheap.c.

909{
912
913 /* done, no logical rewrite in progress */
914 if (!state->rs_logical_rewrite)
915 return;
916
917 /* writeout remaining in-memory entries */
918 if (state->rs_num_rewrite_mappings > 0)
920
921 /* Iterate over all mappings we have written and fsync the files. */
922 hash_seq_init(&seq_status, state->rs_logical_mappings);
923 while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
924 {
928 errmsg("could not fsync file \"%s\": %m", src->path)));
929 FileClose(src->vfd);
930 }
931 /* memory context cleanup will deal with the rest */
932}
int FileSync(File file, uint32 wait_event_info)
Definition fd.c:2336
void FileClose(File file)
Definition fd.c:1966
static void logical_heap_rewrite_flush_mappings(RewriteState state)
char path[MAXPGPATH]

References data_sync_elevel(), ereport, errcode_for_file_access(), errmsg, ERROR, fb(), FileClose(), FileSync(), hash_seq_init(), hash_seq_search(), logical_heap_rewrite_flush_mappings(), RewriteMappingFile::path, and RewriteMappingFile::vfd.

Referenced by end_heap_rewrite().

◆ logical_heap_rewrite_flush_mappings()

static void logical_heap_rewrite_flush_mappings ( RewriteState  state)
static

Definition at line 810 of file rewriteheap.c.

811{
815
816 Assert(state->rs_logical_rewrite);
817
818 /* no logical rewrite in progress, no need to iterate over mappings */
819 if (state->rs_num_rewrite_mappings == 0)
820 return;
821
822 elog(DEBUG1, "flushing %u logical rewrite mapping entries",
823 state->rs_num_rewrite_mappings);
824
825 hash_seq_init(&seq_status, state->rs_logical_mappings);
826 while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
827 {
828 char *waldata;
829 char *waldata_start;
831 Oid dboid;
832 uint32 len;
833 int written;
834 uint32 num_mappings = dclist_count(&src->mappings);
835
836 /* this file hasn't got any new mappings */
837 if (num_mappings == 0)
838 continue;
839
840 if (state->rs_old_rel->rd_rel->relisshared)
841 dboid = InvalidOid;
842 else
843 dboid = MyDatabaseId;
844
845 xlrec.num_mappings = num_mappings;
846 xlrec.mapped_rel = RelationGetRelid(state->rs_old_rel);
847 xlrec.mapped_xid = src->xid;
848 xlrec.mapped_db = dboid;
849 xlrec.offset = src->off;
850 xlrec.start_lsn = state->rs_begin_lsn;
851
852 /* write all mappings consecutively */
853 len = num_mappings * sizeof(LogicalRewriteMappingData);
855
856 /*
857 * collect data we need to write out, but don't modify ondisk data yet
858 */
860 {
862
864
865 memcpy(waldata, &pmap->map, sizeof(pmap->map));
866 waldata += sizeof(pmap->map);
867
868 /* remove from the list and free */
869 dclist_delete_from(&src->mappings, &pmap->node);
870 pfree(pmap);
871
872 /* update bookkeeping */
873 state->rs_num_rewrite_mappings--;
874 }
875
876 Assert(dclist_count(&src->mappings) == 0);
878
879 /*
880 * Note that we deviate from the usual WAL coding practices here,
881 * check the above "Logical rewrite support" comment for reasoning.
882 */
883 written = FileWrite(src->vfd, waldata_start, len, src->off,
885 if (written != len)
888 errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path,
889 written, len)));
890 src->off += len;
891
893 XLogRegisterData(&xlrec, sizeof(xlrec));
895
896 /* write xlog record */
898
900 }
901 Assert(state->rs_num_rewrite_mappings == 0);
902}
#define Assert(condition)
Definition c.h:945
static ssize_t FileWrite(File file, const void *buffer, size_t amount, pgoff_t offset, uint32 wait_event_info)
Definition fd.h:237
Oid MyDatabaseId
Definition globals.c:94
#define XLOG_HEAP2_REWRITE
Definition heapam_xlog.h:59
#define dclist_container(type, membername, ptr)
Definition ilist.h:947
static uint32 dclist_count(const dclist_head *head)
Definition ilist.h:932
static void dclist_delete_from(dclist_head *head, dlist_node *node)
Definition ilist.h:763
#define dclist_foreach_modify(iter, lhead)
Definition ilist.h:973
void pfree(void *pointer)
Definition mcxt.c:1616
void * palloc(Size size)
Definition mcxt.c:1387
#define InvalidOid
#define RelationGetRelid(relation)
Definition rel.h:514
TransactionId xid
dclist_head mappings
dlist_node * cur
Definition ilist.h:200
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition xloginsert.c:479
void XLogRegisterData(const void *data, uint32 len)
Definition xloginsert.c:369
void XLogBeginInsert(void)
Definition xloginsert.c:153

References Assert, dlist_mutable_iter::cur, dclist_container, dclist_count(), dclist_delete_from(), dclist_foreach_modify, DEBUG1, elog, ereport, errcode_for_file_access(), errmsg, ERROR, fb(), FileWrite(), hash_seq_init(), hash_seq_search(), InvalidOid, len, RewriteMappingFile::mappings, MyDatabaseId, RewriteMappingFile::off, palloc(), RewriteMappingFile::path, pfree(), RelationGetRelid, RewriteMappingFile::vfd, RewriteMappingFile::xid, XLOG_HEAP2_REWRITE, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by logical_end_heap_rewrite(), and logical_rewrite_log_mapping().

◆ logical_rewrite_heap_tuple()

static void logical_rewrite_heap_tuple ( RewriteState  state,
ItemPointerData  old_tid,
HeapTuple  new_tuple 
)
static

Definition at line 1002 of file rewriteheap.c.

1004{
1005 ItemPointerData new_tid = new_tuple->t_self;
1006 TransactionId cutoff = state->rs_logical_xmin;
1007 TransactionId xmin;
1008 TransactionId xmax;
1009 bool do_log_xmin = false;
1010 bool do_log_xmax = false;
1012
1013 /* no logical rewrite in progress, we don't need to log anything */
1014 if (!state->rs_logical_rewrite)
1015 return;
1016
1017 xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
1018 /* use *GetUpdateXid to correctly deal with multixacts */
1019 xmax = HeapTupleHeaderGetUpdateXid(new_tuple->t_data);
1020
1021 /*
1022 * Log the mapping iff the tuple has been created recently.
1023 */
1024 if (TransactionIdIsNormal(xmin) && !TransactionIdPrecedes(xmin, cutoff))
1025 do_log_xmin = true;
1026
1027 if (!TransactionIdIsNormal(xmax))
1028 {
1029 /*
1030 * no xmax is set, can't have any permanent ones, so this check is
1031 * sufficient
1032 */
1033 }
1034 else if (HEAP_XMAX_IS_LOCKED_ONLY(new_tuple->t_data->t_infomask))
1035 {
1036 /* only locked, we don't care */
1037 }
1038 else if (!TransactionIdPrecedes(xmax, cutoff))
1039 {
1040 /* tuple has been deleted recently, log */
1041 do_log_xmax = true;
1042 }
1043
1044 /* if neither needs to be logged, we're done */
1045 if (!do_log_xmin && !do_log_xmax)
1046 return;
1047
1048 /* fill out mapping information */
1049 map.old_locator = state->rs_old_rel->rd_locator;
1050 map.old_tid = old_tid;
1051 map.new_locator = state->rs_new_rel->rd_locator;
1052 map.new_tid = new_tid;
1053
1054 /* ---
1055 * Now persist the mapping for the individual xids that are affected. We
1056 * need to log for both xmin and xmax if they aren't the same transaction
1057 * since the mapping files are per "affected" xid.
1058 * We don't muster all that much effort detecting whether xmin and xmax
1059 * are actually the same transaction, we just check whether the xid is the
1060 * same disregarding subtransactions. Logging too much is relatively
1061 * harmless and we could never do the check fully since subtransaction
1062 * data is thrown away during restarts.
1063 * ---
1064 */
1065 if (do_log_xmin)
1067 /* separately log mapping for xmax unless it'd be redundant */
1068 if (do_log_xmax && !TransactionIdEquals(xmin, xmax))
1070}
static bool HEAP_XMAX_IS_LOCKED_ONLY(uint16 infomask)
static TransactionId HeapTupleHeaderGetXmin(const HeapTupleHeaderData *tup)
static TransactionId HeapTupleHeaderGetUpdateXid(const HeapTupleHeaderData *tup)
static void logical_rewrite_log_mapping(RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
ItemPointerData new_tid
Definition rewriteheap.h:40
RelFileLocator old_locator
Definition rewriteheap.h:37
ItemPointerData old_tid
Definition rewriteheap.h:39
RelFileLocator new_locator
Definition rewriteheap.h:38
#define TransactionIdEquals(id1, id2)
Definition transam.h:43
#define TransactionIdIsNormal(xid)
Definition transam.h:42
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263

References fb(), HEAP_XMAX_IS_LOCKED_ONLY(), HeapTupleHeaderGetUpdateXid(), HeapTupleHeaderGetXmin(), logical_rewrite_log_mapping(), LogicalRewriteMappingData::new_locator, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_locator, LogicalRewriteMappingData::old_tid, TransactionIdEquals, TransactionIdIsNormal, and TransactionIdPrecedes().

Referenced by rewrite_heap_tuple().

◆ logical_rewrite_log_mapping()

static void logical_rewrite_log_mapping ( RewriteState  state,
TransactionId  xid,
LogicalRewriteMappingData map 
)
static

Definition at line 938 of file rewriteheap.c.

940{
943 Oid relid;
944 bool found;
945
946 relid = RelationGetRelid(state->rs_old_rel);
947
948 /* look for existing mappings for this 'mapped' xid */
949 src = hash_search(state->rs_logical_mappings, &xid,
950 HASH_ENTER, &found);
951
952 /*
953 * We haven't yet had the need to map anything for this xid, create
954 * per-xid data structures.
955 */
956 if (!found)
957 {
958 char path[MAXPGPATH];
959 Oid dboid;
960
961 if (state->rs_old_rel->rd_rel->relisshared)
962 dboid = InvalidOid;
963 else
964 dboid = MyDatabaseId;
965
966 snprintf(path, MAXPGPATH,
968 PG_LOGICAL_MAPPINGS_DIR, dboid, relid,
969 LSN_FORMAT_ARGS(state->rs_begin_lsn),
971
972 dclist_init(&src->mappings);
973 src->off = 0;
974 memcpy(src->path, path, sizeof(path));
975 src->vfd = PathNameOpenFile(path,
977 if (src->vfd < 0)
980 errmsg("could not create file \"%s\": %m", path)));
981 }
982
983 pmap = MemoryContextAlloc(state->rs_cxt,
985 memcpy(&pmap->map, map, sizeof(LogicalRewriteMappingData));
986 dclist_push_tail(&src->mappings, &pmap->node);
987 state->rs_num_rewrite_mappings++;
988
989 /*
990 * Write out buffer every time we've too many in-memory entries across all
991 * mapping files.
992 */
993 if (state->rs_num_rewrite_mappings >= 1000 /* arbitrary number */ )
995}
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition dynahash.c:952
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition fd.c:1563
@ HASH_ENTER
Definition hsearch.h:114
static void dclist_push_tail(dclist_head *head, dlist_node *node)
Definition ilist.h:709
static void dclist_init(dclist_head *head)
Definition ilist.h:671
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition mcxt.c:1232
TransactionId GetCurrentTransactionId(void)
Definition xact.c:456

References dclist_init(), dclist_push_tail(), ereport, errcode_for_file_access(), errmsg, ERROR, fb(), GetCurrentTransactionId(), HASH_ENTER, hash_search(), InvalidOid, logical_heap_rewrite_flush_mappings(), LOGICAL_REWRITE_FORMAT, LSN_FORMAT_ARGS, RewriteMappingFile::mappings, MAXPGPATH, MemoryContextAlloc(), MyDatabaseId, RewriteMappingFile::off, RewriteMappingFile::path, PathNameOpenFile(), PG_BINARY, PG_LOGICAL_MAPPINGS_DIR, RelationGetRelid, snprintf, and RewriteMappingFile::vfd.

Referenced by logical_rewrite_heap_tuple().

◆ raw_heap_insert()

static void raw_heap_insert ( RewriteState  state,
HeapTuple  tup 
)
static

Definition at line 597 of file rewriteheap.c.

598{
599 Page page;
602 Size len;
605
606 /*
607 * If the new tuple is too big for storage or contains already toasted
608 * out-of-line attributes from some other relation, invoke the toaster.
609 *
610 * Note: below this point, heaptup is the data we actually intend to store
611 * into the relation; tup is the caller's original untoasted data.
612 */
613 if (state->rs_new_rel->rd_rel->relkind == RELKIND_TOASTVALUE)
614 {
615 /* toast table entries should never be recursively toasted */
617 heaptup = tup;
618 }
620 {
622
623 /*
624 * While rewriting the heap for VACUUM FULL / CLUSTER, make sure data
625 * for the TOAST table are not logically decoded. The main heap is
626 * WAL-logged as XLOG FPI records, which are not logically decoded.
627 */
629
631 options);
632 }
633 else
634 heaptup = tup;
635
636 len = MAXALIGN(heaptup->t_len); /* be conservative */
637
638 /*
639 * If we're gonna fail for oversize tuple, do it right away
640 */
641 if (len > MaxHeapTupleSize)
644 errmsg("row is too big: size %zu, maximum size %zu",
646
647 /* Compute desired extra freespace due to fillfactor option */
650
651 /* Now we can check to see if there's enough free space already. */
652 page = (Page) state->rs_buffer;
653 if (page)
654 {
656
658 {
659 /*
660 * Doesn't fit, so write out the existing page. It always
661 * contains a tuple. Hence, unlike RelationGetBufferForTuple(),
662 * enforce saveFreeSpace unconditionally.
663 */
664 smgr_bulk_write(state->rs_bulkstate, state->rs_blockno, state->rs_buffer, true);
665 state->rs_buffer = NULL;
666 page = NULL;
667 state->rs_blockno++;
668 }
669 }
670
671 if (!page)
672 {
673 /* Initialize a new empty page */
674 state->rs_buffer = smgr_bulk_get_buf(state->rs_bulkstate);
675 page = (Page) state->rs_buffer;
676 PageInit(page, BLCKSZ, 0);
677 }
678
679 /* And now we can insert the tuple into the page */
680 newoff = PageAddItem(page, heaptup->t_data, heaptup->t_len, InvalidOffsetNumber, false, true);
682 elog(ERROR, "failed to add tuple");
683
684 /* Update caller's t_self to the actual position where it was stored */
685 ItemPointerSet(&(tup->t_self), state->rs_blockno, newoff);
686
687 /*
688 * Insert the correct position into CTID of the stored tuple, too, if the
689 * caller didn't supply a valid CTID.
690 */
691 if (!ItemPointerIsValid(&tup->t_data->t_ctid))
692 {
695
698
699 onpage_tup->t_ctid = tup->t_self;
700 }
701
702 /* If heaptup is a private copy, release it. */
703 if (heaptup != tup)
705}
Size PageGetHeapFreeSpace(const PageData *page)
Definition bufpage.c:990
void PageInit(Page page, Size pageSize, Size specialSize)
Definition bufpage.c:42
static ItemId PageGetItemId(Page page, OffsetNumber offsetNumber)
Definition bufpage.h:269
static void * PageGetItem(PageData *page, const ItemIdData *itemId)
Definition bufpage.h:379
PageData * Page
Definition bufpage.h:81
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition bufpage.h:504
BulkWriteBuffer smgr_bulk_get_buf(BulkWriteState *bulkstate)
Definition bulk_write.c:347
#define MAXALIGN(LEN)
Definition c.h:898
size_t Size
Definition c.h:691
int errcode(int sqlerrcode)
Definition elog.c:874
#define HEAP_INSERT_SKIP_FSM
Definition heapam.h:37
#define HEAP_INSERT_NO_LOGICAL
Definition heapam.h:39
HeapTuple heap_toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, int options)
Definition heaptoast.c:96
#define TOAST_TUPLE_THRESHOLD
Definition heaptoast.h:48
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1384
HeapTupleHeaderData * HeapTupleHeader
Definition htup.h:23
static bool HeapTupleHasExternal(const HeapTupleData *tuple)
#define MaxHeapTupleSize
static void ItemPointerSet(ItemPointerData *pointer, BlockNumber blockNumber, OffsetNumber offNum)
Definition itemptr.h:135
static bool ItemPointerIsValid(const ItemPointerData *pointer)
Definition itemptr.h:83
#define InvalidOffsetNumber
Definition off.h:26
uint16 OffsetNumber
Definition off.h:24
#define RelationGetTargetPageFreeSpace(relation, defaultff)
Definition rel.h:389
#define HEAP_DEFAULT_FILLFACTOR
Definition rel.h:360

References Assert, elog, ereport, errcode(), errmsg, ERROR, fb(), HEAP_DEFAULT_FILLFACTOR, heap_freetuple(), HEAP_INSERT_NO_LOGICAL, HEAP_INSERT_SKIP_FSM, heap_toast_insert_or_update(), HeapTupleHasExternal(), InvalidOffsetNumber, ItemPointerIsValid(), ItemPointerSet(), len, MAXALIGN, MaxHeapTupleSize, PageAddItem, PageGetHeapFreeSpace(), PageGetItem(), PageGetItemId(), PageInit(), RelationGetTargetPageFreeSpace, smgr_bulk_get_buf(), smgr_bulk_write(), and TOAST_TUPLE_THRESHOLD.

Referenced by end_heap_rewrite(), and rewrite_heap_tuple().

◆ rewrite_heap_dead_tuple()

bool rewrite_heap_dead_tuple ( RewriteState  state,
HeapTuple  old_tuple 
)

Definition at line 547 of file rewriteheap.c.

548{
549 /*
550 * If we have already seen an earlier tuple in the update chain that
551 * points to this tuple, let's forget about that earlier tuple. It's in
552 * fact dead as well, our simple xmax < OldestXmin test in
553 * HeapTupleSatisfiesVacuum just wasn't enough to detect it. It happens
554 * when xmin of a tuple is greater than xmax, which sounds
555 * counter-intuitive but is perfectly valid.
556 *
557 * We don't bother to try to detect the situation the other way round,
558 * when we encounter the dead tuple first and then the recently dead one
559 * that points to it. If that happens, we'll have some unmatched entries
560 * in the UnresolvedTups hash table at the end. That can happen anyway,
561 * because a vacuum might have removed the dead tuple in the chain before
562 * us.
563 */
566 bool found;
567
568 memset(&hashkey, 0, sizeof(hashkey));
570 hashkey.tid = old_tuple->t_self;
571
572 unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
573 HASH_FIND, NULL);
574
575 if (unresolved != NULL)
576 {
577 /* Need to free the contained tuple as well as the hashtable entry */
579 hash_search(state->rs_unresolved_tups, &hashkey,
580 HASH_REMOVE, &found);
581 Assert(found);
582 return true;
583 }
584
585 return false;
586}
@ HASH_FIND
Definition hsearch.h:113
@ HASH_REMOVE
Definition hsearch.h:115

References Assert, fb(), HASH_FIND, HASH_REMOVE, hash_search(), heap_freetuple(), and HeapTupleHeaderGetXmin().

Referenced by heapam_relation_copy_for_cluster().

◆ rewrite_heap_tuple()

void rewrite_heap_tuple ( RewriteState  state,
HeapTuple  old_tuple,
HeapTuple  new_tuple 
)

Definition at line 342 of file rewriteheap.c.

344{
346 ItemPointerData old_tid;
348 bool found;
349 bool free_new;
350
352
353 /*
354 * Copy the original tuple's visibility information into new_tuple.
355 *
356 * XXX we might later need to copy some t_infomask2 bits, too? Right now,
357 * we intentionally clear the HOT status bits.
358 */
359 memcpy(&new_tuple->t_data->t_choice.t_heap,
360 &old_tuple->t_data->t_choice.t_heap,
361 sizeof(HeapTupleFields));
362
363 new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
364 new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
365 new_tuple->t_data->t_infomask |=
366 old_tuple->t_data->t_infomask & HEAP_XACT_MASK;
367
368 /*
369 * While we have our hands on the tuple, we may as well freeze any
370 * eligible xmin or xmax, so that future VACUUM effort can be saved.
371 */
373 state->rs_old_rel->rd_rel->relfrozenxid,
374 state->rs_old_rel->rd_rel->relminmxid,
375 state->rs_freeze_xid,
376 state->rs_cutoff_multi);
377
378 /*
379 * Invalid ctid means that ctid should point to the tuple itself. We'll
380 * override it later if the tuple is part of an update chain.
381 */
382 ItemPointerSetInvalid(&new_tuple->t_data->t_ctid);
383
384 /*
385 * If the tuple has been updated, check the old-to-new mapping hash table.
386 *
387 * Note that this check relies on the HeapTupleSatisfiesVacuum() in
388 * heapam_relation_copy_for_cluster() to have set hint bits.
389 */
390 if (!((old_tuple->t_data->t_infomask & HEAP_XMAX_INVALID) ||
393 !(ItemPointerEquals(&(old_tuple->t_self),
394 &(old_tuple->t_data->t_ctid))))
395 {
397
398 memset(&hashkey, 0, sizeof(hashkey));
400 hashkey.tid = old_tuple->t_data->t_ctid;
401
403 hash_search(state->rs_old_new_tid_map, &hashkey,
404 HASH_FIND, NULL);
405
406 if (mapping != NULL)
407 {
408 /*
409 * We've already copied the tuple that t_ctid points to, so we can
410 * set the ctid of this tuple to point to the new location, and
411 * insert it right away.
412 */
413 new_tuple->t_data->t_ctid = mapping->new_tid;
414
415 /* We don't need the mapping entry anymore */
416 hash_search(state->rs_old_new_tid_map, &hashkey,
417 HASH_REMOVE, &found);
418 Assert(found);
419 }
420 else
421 {
422 /*
423 * We haven't seen the tuple t_ctid points to yet. Stash this
424 * tuple into unresolved_tups to be written later.
425 */
427
428 unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
429 HASH_ENTER, &found);
430 Assert(!found);
431
432 unresolved->old_tid = old_tuple->t_self;
434
435 /*
436 * We can't do anything more now, since we don't know where the
437 * tuple will be written.
438 */
440 return;
441 }
442 }
443
444 /*
445 * Now we will write the tuple, and then check to see if it is the B tuple
446 * in any new or known pair. When we resolve a known pair, we will be
447 * able to write that pair's A tuple, and then we have to check if it
448 * resolves some other pair. Hence, we need a loop here.
449 */
450 old_tid = old_tuple->t_self;
451 free_new = false;
452
453 for (;;)
454 {
455 ItemPointerData new_tid;
456
457 /* Insert the tuple and find out where it's put in new_heap */
459 new_tid = new_tuple->t_self;
460
462
463 /*
464 * If the tuple is the updated version of a row, and the prior version
465 * wouldn't be DEAD yet, then we need to either resolve the prior
466 * version (if it's waiting in rs_unresolved_tups), or make an entry
467 * in rs_old_new_tid_map (so we can resolve it when we do see it). The
468 * previous tuple's xmax would equal this one's xmin, so it's
469 * RECENTLY_DEAD if and only if the xmin is not before OldestXmin.
470 */
471 if ((new_tuple->t_data->t_infomask & HEAP_UPDATED) &&
473 state->rs_oldest_xmin))
474 {
475 /*
476 * Okay, this is B in an update pair. See if we've seen A.
477 */
479
480 memset(&hashkey, 0, sizeof(hashkey));
482 hashkey.tid = old_tid;
483
484 unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
485 HASH_FIND, NULL);
486
487 if (unresolved != NULL)
488 {
489 /*
490 * We have seen and memorized the previous tuple already. Now
491 * that we know where we inserted the tuple its t_ctid points
492 * to, fix its t_ctid and insert it to the new heap.
493 */
494 if (free_new)
496 new_tuple = unresolved->tuple;
497 free_new = true;
498 old_tid = unresolved->old_tid;
499 new_tuple->t_data->t_ctid = new_tid;
500
501 /*
502 * We don't need the hash entry anymore, but don't free its
503 * tuple just yet.
504 */
505 hash_search(state->rs_unresolved_tups, &hashkey,
506 HASH_REMOVE, &found);
507 Assert(found);
508
509 /* loop back to insert the previous tuple in the chain */
510 continue;
511 }
512 else
513 {
514 /*
515 * Remember the new tid of this tuple. We'll use it to set the
516 * ctid when we find the previous tuple in the chain.
517 */
519
520 mapping = hash_search(state->rs_old_new_tid_map, &hashkey,
521 HASH_ENTER, &found);
522 Assert(!found);
523
524 mapping->new_tid = new_tid;
525 }
526 }
527
528 /* Done with this (chain of) tuples, for now */
529 if (free_new)
531 break;
532 }
533
535}
bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId relfrozenxid, TransactionId relminmxid, TransactionId FreezeLimit, TransactionId MultiXactCutoff)
Definition heapam.c:7501
bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple)
HeapTuple heap_copytuple(HeapTuple tuple)
Definition heaptuple.c:698
#define HEAP_XACT_MASK
static bool HeapTupleHeaderIndicatesMovedPartitions(const HeapTupleHeaderData *tup)
#define HEAP_XMAX_INVALID
#define HEAP_UPDATED
bool ItemPointerEquals(const ItemPointerData *pointer1, const ItemPointerData *pointer2)
Definition itemptr.c:35
static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
OldToNewMappingData * OldToNewMapping

References Assert, fb(), HASH_ENTER, HASH_FIND, HASH_REMOVE, hash_search(), heap_copytuple(), heap_freetuple(), heap_freeze_tuple(), HEAP_UPDATED, HEAP_XACT_MASK, HEAP_XMAX_INVALID, HeapTupleHeaderGetUpdateXid(), HeapTupleHeaderGetXmin(), HeapTupleHeaderIndicatesMovedPartitions(), HeapTupleHeaderIsOnlyLocked(), ItemPointerEquals(), ItemPointerSetInvalid(), logical_rewrite_heap_tuple(), MemoryContextSwitchTo(), raw_heap_insert(), and TransactionIdPrecedes().

Referenced by reform_and_rewrite_tuple().