254 state->rs_old_rel = old_heap;
255 state->rs_new_rel = new_heap;
256 state->rs_buffer = NULL;
259 state->rs_oldest_xmin = oldest_xmin;
260 state->rs_freeze_xid = freeze_xid;
261 state->rs_cutoff_multi = cutoff_multi;
262 state->rs_cxt = rw_cxt;
270 state->rs_unresolved_tups =
278 state->rs_old_new_tid_map =
315 if (
state->rs_buffer)
318 state->rs_buffer = NULL;
372 state->rs_old_rel->rd_rel->relfrozenxid,
373 state->rs_old_rel->rd_rel->relminmxid,
374 state->rs_freeze_xid,
375 state->rs_cutoff_multi);
394 memset(&hashkey, 0,
sizeof(hashkey));
446 old_tid = old_tuple->
t_self;
455 new_tid = new_tuple->
t_self;
469 state->rs_oldest_xmin))
476 memset(&hashkey, 0,
sizeof(hashkey));
478 hashkey.
tid = old_tid;
483 if (unresolved != NULL)
492 new_tuple = unresolved->
tuple;
564 memset(&hashkey, 0,
sizeof(hashkey));
571 if (unresolved != NULL)
609 if (
state->rs_new_rel->rd_rel->relkind == RELKIND_TOASTVALUE)
639 (
errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
640 errmsg(
"row is too big: size %zu, maximum size %zu",
653 if (
len + saveFreeSpace > pageFreeSpace)
661 state->rs_buffer = NULL;
769 state->rs_logical_rewrite =
772 if (!
state->rs_logical_rewrite)
784 state->rs_logical_rewrite =
false;
788 state->rs_logical_xmin = logical_xmin;
790 state->rs_num_rewrite_mappings = 0;
796 state->rs_logical_mappings =
816 if (
state->rs_num_rewrite_mappings == 0)
819 elog(
DEBUG1,
"flushing %u logical rewrite mapping entries",
820 state->rs_num_rewrite_mappings);
834 if (num_mappings == 0)
837 if (
state->rs_old_rel->rd_rel->relisshared)
862 memcpy(waldata, &pmap->
map,
sizeof(pmap->
map));
863 waldata +=
sizeof(pmap->
map);
870 state->rs_num_rewrite_mappings--;
881 WAIT_EVENT_LOGICAL_REWRITE_WRITE);
885 errmsg(
"could not write to file \"%s\", wrote %d of %d: %m", src->
path,
896 pfree(waldata_start);
911 if (!
state->rs_logical_rewrite)
915 if (
state->rs_num_rewrite_mappings > 0)
922 if (
FileSync(src->
vfd, WAIT_EVENT_LOGICAL_REWRITE_SYNC) != 0)
925 errmsg(
"could not fsync file \"%s\": %m", src->
path)));
958 if (
state->rs_old_rel->rd_rel->relisshared)
971 memcpy(src->
path, path,
sizeof(path));
973 O_CREAT | O_EXCL | O_WRONLY |
PG_BINARY);
977 errmsg(
"could not create file \"%s\": %m", path)));
984 state->rs_num_rewrite_mappings++;
990 if (
state->rs_num_rewrite_mappings >= 1000 )
1006 bool do_log_xmin =
false;
1007 bool do_log_xmax =
false;
1011 if (!
state->rs_logical_rewrite)
1042 if (!do_log_xmin && !do_log_xmax)
1094 errmsg(
"could not create file \"%s\": %m", path)));
1104 errmsg(
"could not truncate file \"%s\" to %u: %m",
1122 errmsg(
"could not write to file \"%s\": %m", path)));
1135 errmsg(
"could not fsync file \"%s\": %m", path)));
1141 errmsg(
"could not close file \"%s\": %m", path)));
1160 struct dirent *mapping_de;
1188 if (strcmp(mapping_de->
d_name,
".") == 0 ||
1189 strcmp(mapping_de->
d_name,
"..") == 0)
1199 if (strncmp(mapping_de->
d_name,
"map-", 4) != 0)
1203 &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
1206 lsn = ((
uint64) hi) << 32 | lo;
1210 elog(
DEBUG1,
"removing logical rewrite file \"%s\"", path);
1211 if (unlink(path) < 0)
1214 errmsg(
"could not remove file \"%s\": %m", path)));
1229 errmsg(
"could not open file \"%s\": %m", path)));
1240 errmsg(
"could not fsync file \"%s\": %m", path)));
1246 errmsg(
"could not close file \"%s\": %m", path)));
#define RelationGetNumberOfBlocks(reln)
Size PageGetHeapFreeSpace(Page page)
void PageInit(Page page, Size pageSize, Size specialSize)
static Item PageGetItem(Page page, ItemId itemId)
static ItemId PageGetItemId(Page page, OffsetNumber offsetNumber)
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
BulkWriteState * smgr_bulk_start_rel(Relation rel, ForkNumber forknum)
void smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)
BulkWriteBuffer smgr_bulk_get_buf(BulkWriteState *bulkstate)
void smgr_bulk_finish(BulkWriteState *bulkstate)
#define Assert(condition)
TransactionId MultiXactId
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
void * hash_seq_search(HASH_SEQ_STATUS *status)
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
int errcode_for_file_access(void)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
int FileSync(File file, uint32 wait_event_info)
int CloseTransientFile(int fd)
void FileClose(File file)
void fsync_fname(const char *fname, bool isdir)
int data_sync_elevel(int elevel)
File PathNameOpenFile(const char *fileName, int fileFlags)
DIR * AllocateDir(const char *dirname)
struct dirent * ReadDir(DIR *dir, const char *dirname)
int OpenTransientFile(const char *fileName, int fileFlags)
static ssize_t FileWrite(File file, const void *buffer, size_t amount, off_t offset, uint32 wait_event_info)
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId relfrozenxid, TransactionId relminmxid, TransactionId FreezeLimit, TransactionId MultiXactCutoff)
#define HEAP_INSERT_SKIP_FSM
#define HEAP_INSERT_NO_LOGICAL
bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple)
#define XLOG_HEAP2_REWRITE
HeapTuple heap_toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, int options)
#define TOAST_TUPLE_THRESHOLD
HeapTuple heap_copytuple(HeapTuple tuple)
void heap_freetuple(HeapTuple htup)
HeapTupleHeaderData * HeapTupleHeader
#define HEAP_XMAX_IS_LOCKED_ONLY(infomask)
#define HeapTupleHeaderIndicatesMovedPartitions(tup)
#define HeapTupleHeaderGetXmin(tup)
#define HeapTupleHasExternal(tuple)
#define HEAP_XMAX_INVALID
#define HeapTupleHeaderGetUpdateXid(tup)
#define dclist_container(type, membername, ptr)
static void dclist_push_tail(dclist_head *head, dlist_node *node)
static uint32 dclist_count(const dclist_head *head)
static void dclist_delete_from(dclist_head *head, dlist_node *node)
static void dclist_init(dclist_head *head)
#define dclist_foreach_modify(iter, lhead)
if(TABLE==NULL||TABLE_index==NULL)
bool ItemPointerEquals(ItemPointer pointer1, ItemPointer pointer2)
static void ItemPointerSet(ItemPointerData *pointer, BlockNumber blockNumber, OffsetNumber offNum)
static void ItemPointerSetInvalid(ItemPointerData *pointer)
static bool ItemPointerIsValid(const ItemPointerData *pointer)
void * MemoryContextAlloc(MemoryContext context, Size size)
void pfree(void *pointer)
void * palloc0(Size size)
MemoryContext CurrentMemoryContext
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define InvalidOffsetNumber
static int fd(const char *x, int i)
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
MemoryContextSwitchTo(old_ctx)
#define RelationGetRelid(relation)
#define RelationGetTargetPageFreeSpace(relation, defaultff)
#define RelationIsAccessibleInLogicalDecoding(relation)
#define HEAP_DEFAULT_FILLFACTOR
#define PG_LOGICAL_MAPPINGS_DIR
struct RewriteMappingDataEntry RewriteMappingDataEntry
static void raw_heap_insert(RewriteState state, HeapTuple tup)
void end_heap_rewrite(RewriteState state)
bool rewrite_heap_dead_tuple(RewriteState state, HeapTuple old_tuple)
UnresolvedTupData * UnresolvedTup
RewriteState begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xmin, TransactionId freeze_xid, MultiXactId cutoff_multi)
static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
static void logical_heap_rewrite_flush_mappings(RewriteState state)
void heap_xlog_logical_rewrite(XLogReaderState *r)
static void logical_begin_heap_rewrite(RewriteState state)
void CheckPointLogicalRewriteHeap(void)
struct RewriteMappingFile RewriteMappingFile
static void logical_end_heap_rewrite(RewriteState state)
OldToNewMappingData * OldToNewMapping
struct RewriteStateData RewriteStateData
void rewrite_heap_tuple(RewriteState state, HeapTuple old_tuple, HeapTuple new_tuple)
static void logical_rewrite_log_mapping(RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
#define LOGICAL_REWRITE_FORMAT
struct LogicalRewriteMappingData LogicalRewriteMappingData
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
RelFileLocator old_locator
RelFileLocator new_locator
LogicalRewriteMappingData map
TransactionId rs_freeze_xid
TransactionId rs_oldest_xmin
HTAB * rs_logical_mappings
HTAB * rs_unresolved_tups
uint32 rs_num_rewrite_mappings
TransactionId rs_logical_xmin
BulkWriteState * rs_bulkstate
BulkWriteBuffer rs_buffer
HTAB * rs_old_new_tid_map
MultiXactId rs_cutoff_multi
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
#define InvalidTransactionId
#define TransactionIdEquals(id1, id2)
#define TransactionIdIsNormal(xid)
static void pgstat_report_wait_start(uint32 wait_event_info)
static void pgstat_report_wait_end(void)
TransactionId GetCurrentTransactionId(void)
XLogRecPtr GetRedoRecPtr(void)
XLogRecPtr GetXLogInsertRecPtr(void)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
void XLogRegisterData(const char *data, uint32 len)
void XLogBeginInsert(void)
#define XLogRecGetData(decoder)
#define XLogRecGetXid(decoder)