256 state->rs_old_rel = old_heap;
257 state->rs_new_rel = new_heap;
261 state->rs_buffer_valid =
false;
262 state->rs_oldest_xmin = oldest_xmin;
263 state->rs_freeze_xid = freeze_xid;
264 state->rs_cutoff_multi = cutoff_multi;
265 state->rs_cxt = rw_cxt;
272 state->rs_unresolved_tups =
280 state->rs_old_new_tid_map =
317 if (
state->rs_buffer_valid)
391 state->rs_old_rel->rd_rel->relfrozenxid,
392 state->rs_old_rel->rd_rel->relminmxid,
393 state->rs_freeze_xid,
394 state->rs_cutoff_multi);
413 memset(&hashkey, 0,
sizeof(hashkey));
465 old_tid = old_tuple->
t_self;
474 new_tid = new_tuple->
t_self;
488 state->rs_oldest_xmin))
495 memset(&hashkey, 0,
sizeof(hashkey));
497 hashkey.
tid = old_tid;
502 if (unresolved != NULL)
511 new_tuple = unresolved->
tuple;
583 memset(&hashkey, 0,
sizeof(hashkey));
590 if (unresolved != NULL)
628 if (
state->rs_new_rel->rd_rel->relkind == RELKIND_TOASTVALUE)
658 (
errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
659 errmsg(
"row is too big: size %zu, maximum size %zu",
667 if (
state->rs_buffer_valid)
671 if (
len + saveFreeSpace > pageFreeSpace)
695 state->rs_blockno, page,
true);
698 state->rs_buffer_valid =
false;
702 if (!
state->rs_buffer_valid)
706 state->rs_buffer_valid =
true;
803 state->rs_logical_rewrite =
806 if (!
state->rs_logical_rewrite)
818 state->rs_logical_rewrite =
false;
822 state->rs_logical_xmin = logical_xmin;
824 state->rs_num_rewrite_mappings = 0;
830 state->rs_logical_mappings =
850 if (
state->rs_num_rewrite_mappings == 0)
853 elog(
DEBUG1,
"flushing %u logical rewrite mapping entries",
854 state->rs_num_rewrite_mappings);
868 if (num_mappings == 0)
871 if (
state->rs_old_rel->rd_rel->relisshared)
896 memcpy(waldata, &pmap->
map,
sizeof(pmap->
map));
897 waldata +=
sizeof(pmap->
map);
904 state->rs_num_rewrite_mappings--;
915 WAIT_EVENT_LOGICAL_REWRITE_WRITE);
919 errmsg(
"could not write to file \"%s\", wrote %d of %d: %m", src->
path,
930 pfree(waldata_start);
945 if (!
state->rs_logical_rewrite)
949 if (
state->rs_num_rewrite_mappings > 0)
956 if (
FileSync(src->
vfd, WAIT_EVENT_LOGICAL_REWRITE_SYNC) != 0)
959 errmsg(
"could not fsync file \"%s\": %m", src->
path)));
992 if (
state->rs_old_rel->rd_rel->relisshared)
1005 memcpy(src->
path, path,
sizeof(path));
1007 O_CREAT | O_EXCL | O_WRONLY |
PG_BINARY);
1011 errmsg(
"could not create file \"%s\": %m", path)));
1018 state->rs_num_rewrite_mappings++;
1024 if (
state->rs_num_rewrite_mappings >= 1000 )
1040 bool do_log_xmin =
false;
1041 bool do_log_xmax =
false;
1045 if (!
state->rs_logical_rewrite)
1076 if (!do_log_xmin && !do_log_xmax)
1128 errmsg(
"could not create file \"%s\": %m", path)));
1138 errmsg(
"could not truncate file \"%s\" to %u: %m",
1156 errmsg(
"could not write to file \"%s\": %m", path)));
1169 errmsg(
"could not fsync file \"%s\": %m", path)));
1175 errmsg(
"could not close file \"%s\": %m", path)));
1194 struct dirent *mapping_de;
1210 mappings_dir =
AllocateDir(
"pg_logical/mappings");
1211 while ((mapping_de =
ReadDir(mappings_dir,
"pg_logical/mappings")) != NULL)
1222 if (strcmp(mapping_de->
d_name,
".") == 0 ||
1223 strcmp(mapping_de->
d_name,
"..") == 0)
1226 snprintf(path,
sizeof(path),
"pg_logical/mappings/%s", mapping_de->
d_name);
1233 if (strncmp(mapping_de->
d_name,
"map-", 4) != 0)
1237 &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
1240 lsn = ((uint64) hi) << 32 | lo;
1244 elog(
DEBUG1,
"removing logical rewrite file \"%s\"", path);
1245 if (unlink(path) < 0)
1248 errmsg(
"could not remove file \"%s\": %m", path)));
1263 errmsg(
"could not open file \"%s\": %m", path)));
1274 errmsg(
"could not fsync file \"%s\": %m", path)));
1280 errmsg(
"could not close file \"%s\": %m", path)));
#define RelationGetNumberOfBlocks(reln)
Size PageGetHeapFreeSpace(Page page)
void PageSetChecksumInplace(Page page, BlockNumber blkno)
void PageInit(Page page, Size pageSize, Size specialSize)
static Item PageGetItem(Page page, ItemId itemId)
static ItemId PageGetItemId(Page page, OffsetNumber offsetNumber)
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
TransactionId MultiXactId
elog(ERROR, "%s: %s", p2, msg)
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
void * hash_seq_search(HASH_SEQ_STATUS *status)
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
int errcode_for_file_access(void)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
struct dirent * ReadDir(DIR *dir, const char *dirname)
int FileSync(File file, uint32 wait_event_info)
int CloseTransientFile(int fd)
void FileClose(File file)
void fsync_fname(const char *fname, bool isdir)
int data_sync_elevel(int elevel)
File PathNameOpenFile(const char *fileName, int fileFlags)
int FileWrite(File file, const void *buffer, size_t amount, off_t offset, uint32 wait_event_info)
int OpenTransientFile(const char *fileName, int fileFlags)
DIR * AllocateDir(const char *dirname)
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId relfrozenxid, TransactionId relminmxid, TransactionId FreezeLimit, TransactionId MultiXactCutoff)
#define HEAP_INSERT_SKIP_FSM
#define HEAP_INSERT_NO_LOGICAL
bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple)
#define XLOG_HEAP2_REWRITE
HeapTuple heap_toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, int options)
#define TOAST_TUPLE_THRESHOLD
HeapTuple heap_copytuple(HeapTuple tuple)
void heap_freetuple(HeapTuple htup)
HeapTupleHeaderData * HeapTupleHeader
#define HEAP_XMAX_IS_LOCKED_ONLY(infomask)
#define HeapTupleHeaderIndicatesMovedPartitions(tup)
#define HeapTupleHeaderGetXmin(tup)
#define HeapTupleHasExternal(tuple)
#define HEAP_XMAX_INVALID
#define HeapTupleHeaderGetUpdateXid(tup)
#define dclist_container(type, membername, ptr)
static void dclist_push_tail(dclist_head *head, dlist_node *node)
static uint32 dclist_count(const dclist_head *head)
static void dclist_delete_from(dclist_head *head, dlist_node *node)
static void dclist_init(dclist_head *head)
#define dclist_foreach_modify(iter, lhead)
bool ItemPointerEquals(ItemPointer pointer1, ItemPointer pointer2)
static void ItemPointerSet(ItemPointerData *pointer, BlockNumber blockNumber, OffsetNumber offNum)
static void ItemPointerSetInvalid(ItemPointerData *pointer)
static bool ItemPointerIsValid(const ItemPointerData *pointer)
Assert(fmt[strlen(fmt) - 1] !='\n')
void pfree(void *pointer)
void * palloc0(Size size)
MemoryContext CurrentMemoryContext
void * MemoryContextAlloc(MemoryContext context, Size size)
void * palloc_aligned(Size size, Size alignto, int flags)
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define InvalidOffsetNumber
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
static int fd(const char *x, int i)
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
#define RelationGetRelid(relation)
#define RelationGetTargetPageFreeSpace(relation, defaultff)
static SMgrRelation RelationGetSmgr(Relation rel)
#define RelationIsAccessibleInLogicalDecoding(relation)
#define RelationNeedsWAL(relation)
#define HEAP_DEFAULT_FILLFACTOR
struct RewriteMappingDataEntry RewriteMappingDataEntry
static void raw_heap_insert(RewriteState state, HeapTuple tup)
void end_heap_rewrite(RewriteState state)
bool rewrite_heap_dead_tuple(RewriteState state, HeapTuple old_tuple)
UnresolvedTupData * UnresolvedTup
RewriteState begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xmin, TransactionId freeze_xid, MultiXactId cutoff_multi)
static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
static void logical_heap_rewrite_flush_mappings(RewriteState state)
void heap_xlog_logical_rewrite(XLogReaderState *r)
static void logical_begin_heap_rewrite(RewriteState state)
void CheckPointLogicalRewriteHeap(void)
struct RewriteMappingFile RewriteMappingFile
static void logical_end_heap_rewrite(RewriteState state)
OldToNewMappingData * OldToNewMapping
struct RewriteStateData RewriteStateData
void rewrite_heap_tuple(RewriteState state, HeapTuple old_tuple, HeapTuple new_tuple)
static void logical_rewrite_log_mapping(RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
#define LOGICAL_REWRITE_FORMAT
struct LogicalRewriteMappingData LogicalRewriteMappingData
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
void smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const void *buffer, bool skipFsync)
RelFileLocator old_locator
RelFileLocator new_locator
LogicalRewriteMappingData map
TransactionId rs_freeze_xid
TransactionId rs_oldest_xmin
HTAB * rs_logical_mappings
HTAB * rs_unresolved_tups
uint32 rs_num_rewrite_mappings
TransactionId rs_logical_xmin
HTAB * rs_old_new_tid_map
MultiXactId rs_cutoff_multi
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
#define InvalidTransactionId
#define TransactionIdEquals(id1, id2)
#define TransactionIdIsNormal(xid)
static void pgstat_report_wait_start(uint32 wait_event_info)
static void pgstat_report_wait_end(void)
TransactionId GetCurrentTransactionId(void)
XLogRecPtr GetRedoRecPtr(void)
XLogRecPtr GetXLogInsertRecPtr(void)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr
void XLogRegisterData(char *data, uint32 len)
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
XLogRecPtr log_newpage(RelFileLocator *rlocator, ForkNumber forknum, BlockNumber blkno, Page page, bool page_std)
void XLogBeginInsert(void)
#define XLogRecGetData(decoder)
#define XLogRecGetXid(decoder)