270 state->rs_unresolved_tups =
278 state->rs_old_new_tid_map =
315 if (
state->rs_buffer)
372 state->rs_old_rel->rd_rel->relfrozenxid,
373 state->rs_old_rel->rd_rel->relminmxid,
374 state->rs_freeze_xid,
375 state->rs_cutoff_multi);
472 state->rs_oldest_xmin))
643 errmsg(
"row is too big: size %zu, maximum size %zu",
771 state->rs_logical_rewrite =
774 if (!
state->rs_logical_rewrite)
786 state->rs_logical_rewrite =
false;
792 state->rs_num_rewrite_mappings = 0;
798 state->rs_logical_mappings =
818 if (
state->rs_num_rewrite_mappings == 0)
821 elog(
DEBUG1,
"flushing %u logical rewrite mapping entries",
822 state->rs_num_rewrite_mappings);
836 if (num_mappings == 0)
839 if (
state->rs_old_rel->rd_rel->relisshared)
844 xlrec.num_mappings = num_mappings;
847 xlrec.mapped_db = dboid;
872 state->rs_num_rewrite_mappings--;
887 errmsg(
"could not write to file \"%s\", wrote %d of %d: %m", src->
path,
913 if (!
state->rs_logical_rewrite)
917 if (
state->rs_num_rewrite_mappings > 0)
927 errmsg(
"could not fsync file \"%s\": %m", src->
path)));
960 if (
state->rs_old_rel->rd_rel->relisshared)
979 errmsg(
"could not create file \"%s\": %m", path)));
986 state->rs_num_rewrite_mappings++;
992 if (
state->rs_num_rewrite_mappings >= 1000 )
1013 if (!
state->rs_logical_rewrite)
1096 errmsg(
"could not create file \"%s\": %m", path)));
1106 errmsg(
"could not truncate file \"%s\" to %u: %m",
1124 errmsg(
"could not write to file \"%s\": %m", path)));
1137 errmsg(
"could not fsync file \"%s\": %m", path)));
1143 errmsg(
"could not close file \"%s\": %m", path)));
1208 lsn = ((
uint64) hi) << 32 | lo;
1212 elog(
DEBUG1,
"removing logical rewrite file \"%s\"", path);
1216 errmsg(
"could not remove file \"%s\": %m", path)));
1231 errmsg(
"could not open file \"%s\": %m", path)));
1242 errmsg(
"could not fsync file \"%s\": %m", path)));
1248 errmsg(
"could not close file \"%s\": %m", path)));
#define RelationGetNumberOfBlocks(reln)
Size PageGetHeapFreeSpace(const PageData *page)
void PageInit(Page page, Size pageSize, Size specialSize)
static ItemId PageGetItemId(Page page, OffsetNumber offsetNumber)
static void * PageGetItem(PageData *page, const ItemIdData *itemId)
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
BulkWriteState * smgr_bulk_start_rel(Relation rel, ForkNumber forknum)
void smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)
BulkWriteBuffer smgr_bulk_get_buf(BulkWriteState *bulkstate)
void smgr_bulk_finish(BulkWriteState *bulkstate)
#define Assert(condition)
TransactionId MultiXactId
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
void * hash_seq_search(HASH_SEQ_STATUS *status)
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
int errcode_for_file_access(void)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
int FileSync(File file, uint32 wait_event_info)
int CloseTransientFile(int fd)
void FileClose(File file)
void fsync_fname(const char *fname, bool isdir)
int data_sync_elevel(int elevel)
File PathNameOpenFile(const char *fileName, int fileFlags)
DIR * AllocateDir(const char *dirname)
struct dirent * ReadDir(DIR *dir, const char *dirname)
int OpenTransientFile(const char *fileName, int fileFlags)
static ssize_t FileWrite(File file, const void *buffer, size_t amount, pgoff_t offset, uint32 wait_event_info)
#define palloc0_object(type)
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId relfrozenxid, TransactionId relminmxid, TransactionId FreezeLimit, TransactionId MultiXactCutoff)
#define HEAP_INSERT_SKIP_FSM
#define HEAP_INSERT_NO_LOGICAL
bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple)
#define XLOG_HEAP2_REWRITE
HeapTuple heap_toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, int options)
#define TOAST_TUPLE_THRESHOLD
HeapTuple heap_copytuple(HeapTuple tuple)
void heap_freetuple(HeapTuple htup)
HeapTupleHeaderData * HeapTupleHeader
static bool HEAP_XMAX_IS_LOCKED_ONLY(uint16 infomask)
static bool HeapTupleHasExternal(const HeapTupleData *tuple)
static TransactionId HeapTupleHeaderGetXmin(const HeapTupleHeaderData *tup)
static bool HeapTupleHeaderIndicatesMovedPartitions(const HeapTupleHeaderData *tup)
#define HEAP_XMAX_INVALID
static TransactionId HeapTupleHeaderGetUpdateXid(const HeapTupleHeaderData *tup)
#define dclist_container(type, membername, ptr)
static void dclist_push_tail(dclist_head *head, dlist_node *node)
static uint32 dclist_count(const dclist_head *head)
static void dclist_delete_from(dclist_head *head, dlist_node *node)
static void dclist_init(dclist_head *head)
#define dclist_foreach_modify(iter, lhead)
bool ItemPointerEquals(const ItemPointerData *pointer1, const ItemPointerData *pointer2)
static void ItemPointerSet(ItemPointerData *pointer, BlockNumber blockNumber, OffsetNumber offNum)
static void ItemPointerSetInvalid(ItemPointerData *pointer)
static bool ItemPointerIsValid(const ItemPointerData *pointer)
void * MemoryContextAlloc(MemoryContext context, Size size)
void pfree(void *pointer)
MemoryContext CurrentMemoryContext
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define InvalidOffsetNumber
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
static int fd(const char *x, int i)
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
#define RelationGetRelid(relation)
#define RelationGetTargetPageFreeSpace(relation, defaultff)
#define RelationIsAccessibleInLogicalDecoding(relation)
#define HEAP_DEFAULT_FILLFACTOR
#define PG_LOGICAL_MAPPINGS_DIR
static void raw_heap_insert(RewriteState state, HeapTuple tup)
void end_heap_rewrite(RewriteState state)
bool rewrite_heap_dead_tuple(RewriteState state, HeapTuple old_tuple)
UnresolvedTupData * UnresolvedTup
RewriteState begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xmin, TransactionId freeze_xid, MultiXactId cutoff_multi)
static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
static void logical_heap_rewrite_flush_mappings(RewriteState state)
void heap_xlog_logical_rewrite(XLogReaderState *r)
static void logical_begin_heap_rewrite(RewriteState state)
void CheckPointLogicalRewriteHeap(void)
struct RewriteMappingFile RewriteMappingFile
static void logical_end_heap_rewrite(RewriteState state)
OldToNewMappingData * OldToNewMapping
void rewrite_heap_tuple(RewriteState state, HeapTuple old_tuple, HeapTuple new_tuple)
static void logical_rewrite_log_mapping(RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
#define LOGICAL_REWRITE_FORMAT
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
RelFileLocator old_locator
RelFileLocator new_locator
LogicalRewriteMappingData map
TransactionId rs_freeze_xid
TransactionId rs_oldest_xmin
HTAB * rs_logical_mappings
HTAB * rs_unresolved_tups
uint32 rs_num_rewrite_mappings
TransactionId rs_logical_xmin
BulkWriteState * rs_bulkstate
BulkWriteBuffer rs_buffer
HTAB * rs_old_new_tid_map
MultiXactId rs_cutoff_multi
#define InvalidTransactionId
#define TransactionIdEquals(id1, id2)
#define TransactionIdIsNormal(xid)
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
static void pgstat_report_wait_start(uint32 wait_event_info)
static void pgstat_report_wait_end(void)
TransactionId GetCurrentTransactionId(void)
XLogRecPtr GetRedoRecPtr(void)
XLogRecPtr GetXLogInsertRecPtr(void)
#define XLogRecPtrIsValid(r)
#define LSN_FORMAT_ARGS(lsn)
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
void XLogRegisterData(const void *data, uint32 len)
void XLogBeginInsert(void)
#define XLogRecGetData(decoder)
#define XLogRecGetXid(decoder)