185#define IsSpecInsert(action) \
187 ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \
189#define IsSpecConfirmOrAbort(action) \
191 (((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) || \
192 ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT)) \
194#define IsInsertOrUpdate(action) \
196 (((action) == REORDER_BUFFER_CHANGE_INSERT) || \
197 ((action) == REORDER_BUFFER_CHANGE_UPDATE) || \
198 ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT)) \
305 bool addition,
Size sz);
328 memset(&hash_ctl, 0,
sizeof(hash_ctl));
457 if (txn->
gid != NULL)
611 alloc_len =
sizeof(
Oid) * nrelids;
635 bool *is_new,
XLogRecPtr lsn,
bool create_as_top)
709 Assert(!create || txn != NULL);
757 toptxn->
txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
768 toptxn->
txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
856 bool transactional,
const char *prefix,
857 Size message_size,
const char *message)
889 volatile Snapshot snapshot_now = snap;
901 rb->
message(rb, txn, lsn,
false, prefix, message_size, message);
925#ifdef USE_ASSERT_CHECKING
958 Assert(prev_first_lsn < cur_txn->first_lsn);
978 Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
996#ifdef USE_ASSERT_CHECKING
1013 Assert(prev_lsn <= cur_change->lsn);
1015 prev_lsn = cur_change->
lsn;
1250 else if (pos_a == pos_b)
1305 state->nr_txns = nr_txns;
1308 for (off = 0; off <
state->nr_txns; off++)
1310 state->entries[off].file.vfd = -1;
1311 state->entries[off].segno = 0;
1320 *iter_state =
state;
1339 &
state->entries[off].segno);
1345 state->entries[off].lsn = cur_change->
lsn;
1346 state->entries[off].change = cur_change;
1347 state->entries[off].txn = txn;
1368 &
state->entries[off].file,
1369 &
state->entries[off].segno);
1374 state->entries[off].lsn = cur_change->
lsn;
1375 state->entries[off].change = cur_change;
1376 state->entries[off].txn = cur_txn;
1400 if (
state->heap->bh_size == 0)
1404 entry = &
state->entries[off];
1430 state->entries[off].lsn = next_change->
lsn;
1431 state->entries[off].change = next_change;
1454 &
state->entries[off].segno))
1461 elog(
DEBUG2,
"restored %u/%u changes from disk",
1467 state->entries[off].lsn = next_change->
lsn;
1468 state->entries[off].change = next_change;
1490 for (off = 0; off <
state->nr_txns; off++)
1492 if (
state->entries[off].file.vfd != -1)
2068 int nrelations,
Relation *relations,
2107 if (snapshot_now->
copied)
2159 if (specinsert != NULL)
2203 volatile bool stream_started =
false;
2227 int changes_count = 0;
2265 stream_started =
true;
2276 prev_lsn = change->
lsn;
2285 curtxn = change->
txn;
2298 if (specinsert == NULL)
2299 elog(
ERROR,
"invalid ordering of speculative insertion changes");
2301 change = specinsert;
2330 elog(
ERROR,
"could not map filenumber \"%s\" to relation OID",
2337 elog(
ERROR,
"could not open relation with OID %u (for filenumber \"%s\")",
2358 if (relation->
rd_rel->relkind == RELKIND_SEQUENCE)
2400 if (specinsert != NULL)
2429 if (specinsert != NULL)
2437 specinsert = change;
2450 if (specinsert != NULL)
2474 for (
i = 0;
i < nrelids;
i++)
2482 elog(
ERROR,
"could not open relation with OID %u", relid);
2487 relations[nrelations++] = rel;
2495 for (
i = 0;
i < nrelations;
i++)
2515 if (snapshot_now->
copied)
2546 if (command_id < change->
data.command_id)
2550 if (!snapshot_now->
copied)
2557 snapshot_now->
curcid = command_id;
2566 elog(
ERROR,
"tuplecid value in changequeue");
2580#define CHANGES_THRESHOLD 100
2619 stream_started =
false;
2631 rb->
prepare(rb, txn, commit_lsn);
2635 rb->
commit(rb, txn, commit_lsn);
2640 elog(
ERROR,
"output plugin used XID %u",
2649 else if (snapshot_now->
copied)
2731 if (errdata->
sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2752 command_id, prev_lsn,
2852 origin_id, origin_lsn);
2958 XLogRecPtr origin_lsn,
char *gid,
bool is_commit)
2974 prepare_end_lsn = txn->
end_lsn;
2988 if ((txn->
final_lsn < two_phase_at) && is_commit)
3222 for (
i = 0;
i < ninvalidations;
i++)
3333 bool addition,
Size sz)
3501 for (
i = 0;
i < nmsgs;
i++)
3568 xids[xcnt++] = txn->
xid;
3706 Size largest_size = 0;
3728 if ((largest == NULL || txn->
total_size > largest_size) &&
3842 elog(
DEBUG2,
"spill %u changes in XID %u to disk",
3884 O_CREAT | O_WRONLY | O_APPEND |
PG_BINARY);
3889 errmsg(
"could not open file \"%s\": %m", path)));
3959 oldlen = oldtup->
t_len;
3966 newlen = newtup->
t_len;
4011 memcpy(
data, &prefix_size,
sizeof(
Size));
4015 data += prefix_size;
4114 int save_errno = errno;
4119 errno = save_errno ? save_errno : ENOSPC;
4122 errmsg(
"could not write to data file for XID %u: %m",
4183 bool txn_is_streamed;
4318 oldlen = oldtup->
t_len;
4325 newlen = newtup->
t_len;
4434 if (*
fd < 0 && errno == ENOENT)
4443 errmsg(
"could not open file \"%s\": %m",
4455 file->
curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
4465 else if (readBytes < 0)
4468 errmsg(
"could not read from reorderbuffer spill file: %m")));
4472 errmsg(
"could not read from reorderbuffer spill file: read %d instead of %u bytes",
4488 WAIT_EVENT_REORDER_BUFFER_READ);
4493 errmsg(
"could not read from reorderbuffer spill file: %m")));
4497 errmsg(
"could not read from reorderbuffer spill file: read %d instead of %u bytes",
4598 memcpy(&prefix_size,
data,
sizeof(
Size));
4604 data += prefix_size;
4646 memcpy(newsnap,
data, size);
4708 if (unlink(path) != 0 && errno != ENOENT)
4711 errmsg(
"could not remove file \"%s\": %m", path)));
4724 struct stat statbuf;
4737 if (strncmp(spill_de->
d_name,
"xid", 3) == 0)
4743 if (unlink(path) != 0)
4746 errmsg(
"could not remove file \"%s\" during removal of %s/%s/xid*: %m",
4780 struct dirent *logical_de;
4785 if (strcmp(logical_de->
d_name,
".") == 0 ||
4786 strcmp(logical_de->
d_name,
"..") == 0)
4868 elog(
ERROR,
"got sequence entry %d for toast chunk %u instead of seq 0",
4869 chunk_seq, chunk_id);
4872 elog(
ERROR,
"got sequence entry %d for toast chunk %u instead of seq %d",
4885 elog(
ERROR,
"unexpected type of toast chunk");
4887 ent->
size += chunksize;
4954 elog(
ERROR,
"could not open toast relation with OID %u (base relation \"%s\")",
4968 for (natt = 0; natt < desc->
natts; natt++)
4977 struct varlena *new_datum = NULL;
4978 struct varlena *reconstructed;
4983 if (attr->attnum < 0)
4986 if (attr->attisdropped)
4990 if (attr->attlen != -1)
5042 memcpy(
VARDATA(reconstructed) + data_done,
5055 memset(&redirect_pointer, 0,
sizeof(redirect_pointer));
5056 redirect_pointer.
pointer = reconstructed;
5060 sizeof(redirect_pointer));
5083 for (natt = 0; natt < desc->
natts; natt++)
5184 elog(
DEBUG3,
"mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
5216 errmsg(
"could not open file \"%s\": %m", path)));
5236 errmsg(
"could not read file \"%s\": %m",
5238 else if (readBytes == 0)
5243 errmsg(
"could not read from file \"%s\": read %d instead of %d bytes",
5288 errmsg(
"could not close file \"%s\": %m", path)));
5298 return bsearch(&xid, xip, num,
5322 struct dirent *mapping_de;
5339 if (strcmp(mapping_de->
d_name,
".") == 0 ||
5340 strcmp(mapping_de->
d_name,
"..") == 0)
5344 if (strncmp(mapping_de->
d_name,
"map-", 4) != 0)
5348 &f_dboid, &f_relid, &f_hi, &f_lo,
5349 &f_mapped_xid, &f_create_xid) != 6)
5352 f_lsn = ((
uint64) f_hi) << 32 | f_lo;
5355 if (f_dboid != dboid)
5359 if (f_relid != relid)
5381 foreach(file, files)
5406 bool updated_mapping =
false;
5420 memset(&
key, 0,
sizeof(
key));
5447 if (ent == NULL && !updated_mapping)
5451 updated_mapping =
true;
5454 else if (ent == NULL)
void binaryheap_build(binaryheap *heap)
void binaryheap_replace_first(binaryheap *heap, bh_node_type d)
bh_node_type binaryheap_first(binaryheap *heap)
bh_node_type binaryheap_remove_first(binaryheap *heap)
void binaryheap_free(binaryheap *heap)
void binaryheap_add_unordered(binaryheap *heap, bh_node_type d)
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
static void cleanup(void)
#define BufferIsLocal(buffer)
void BufferGetTag(Buffer buffer, RelFileLocator *rlocator, ForkNumber *forknum, BlockNumber *blknum)
#define FLEXIBLE_ARRAY_MEMBER
bool IsToastRelation(Relation relation)
bool IsSharedRelation(Oid relationId)
#define INDIRECT_POINTER_SIZE
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
void hash_destroy(HTAB *hashp)
void * hash_seq_search(HASH_SEQ_STATUS *status)
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
void FreeErrorData(ErrorData *edata)
int errcode_for_file_access(void)
ErrorData * CopyErrorData(void)
void FlushErrorState(void)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
int CloseTransientFile(int fd)
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
void FileClose(File file)
File PathNameOpenFile(const char *fileName, int fileFlags)
DIR * AllocateDir(const char *dirname)
struct dirent * ReadDir(DIR *dir, const char *dirname)
int OpenTransientFile(const char *fileName, int fileFlags)
static ssize_t FileRead(File file, void *buffer, size_t amount, off_t offset, uint32 wait_event_info)
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Assert(PointerIsAligned(start, uint64))
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
HeapTupleData * HeapTuple
struct HeapTupleData HeapTupleData
HeapTupleHeaderData * HeapTupleHeader
#define SizeofHeapTupleHeader
static Datum fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)
static dlist_node * dlist_pop_head_node(dlist_head *head)
#define dlist_foreach(iter, lhead)
static void dlist_init(dlist_head *head)
#define dclist_container(type, membername, ptr)
static bool dlist_has_next(const dlist_head *head, const dlist_node *node)
static void dclist_push_tail(dclist_head *head, dlist_node *node)
static void dlist_insert_before(dlist_node *before, dlist_node *node)
#define dlist_head_element(type, membername, lhead)
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
static void dlist_delete(dlist_node *node)
static uint32 dclist_count(const dclist_head *head)
#define dlist_foreach_modify(iter, lhead)
static bool dlist_is_empty(const dlist_head *head)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
static void dclist_delete_from(dclist_head *head, dlist_node *node)
static void dclist_init(dclist_head *head)
#define dlist_container(type, membername, ptr)
#define dclist_foreach(iter, lhead)
static int pg_cmp_u64(uint64 a, uint64 b)
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
if(TABLE==NULL||TABLE_index==NULL)
static OffsetNumber ItemPointerGetOffsetNumber(const ItemPointerData *pointer)
static BlockNumber ItemPointerGetBlockNumber(const ItemPointerData *pointer)
static void ItemPointerCopy(const ItemPointerData *fromPointer, ItemPointerData *toPointer)
List * lappend(List *list, void *datum)
void list_sort(List *list, list_sort_comparator cmp)
void UpdateDecodingStats(LogicalDecodingContext *ctx)
void * MemoryContextAlloc(MemoryContext context, Size size)
void * MemoryContextAllocZero(MemoryContext context, Size size)
char * pstrdup(const char *in)
void * repalloc(void *pointer, Size size)
void pfree(void *pointer)
void * palloc0(Size size)
MemoryContext CurrentMemoryContext
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define SLAB_DEFAULT_BLOCK_SIZE
#define CHECK_FOR_INTERRUPTS()
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
pairingheap * pairingheap_allocate(pairingheap_comparator compare, void *arg)
pairingheap_node * pairingheap_first(pairingheap *heap)
#define pairingheap_container(type, membername, ptr)
#define pairingheap_const_container(type, membername, ptr)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
FormData_pg_attribute * Form_pg_attribute
#define qsort(a, b, c, d)
static Datum PointerGetDatum(const void *X)
static Oid DatumGetObjectId(Datum X)
static Pointer DatumGetPointer(Datum X)
static Datum Int32GetDatum(int32 X)
static int32 DatumGetInt32(Datum X)
static int fd(const char *x, int i)
bool TransactionIdIsInProgress(TransactionId xid)
#define RelationIsLogicallyLogged(relation)
#define RelationGetDescr(relation)
#define RelationGetRelationName(relation)
#define RelationIsValid(relation)
Relation RelationIdGetRelation(Oid relationId)
void RelationClose(Relation relation)
Oid RelidByRelfilenumber(Oid reltablespace, RelFileNumber relfilenumber)
#define relpathperm(rlocator, forknum)
static int file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
void ReorderBufferFreeRelids(ReorderBuffer *rb, Oid *relids)
void ReorderBufferFreeChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)
void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileLocator locator, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
static void ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, TimestampTz abort_time)
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)
static void ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
struct ReorderBufferDiskChange ReorderBufferDiskChange
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
void ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb)
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
bool ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
void ReorderBufferFreeTupleBuf(HeapTuple tuple)
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid)
uint32 ReorderBufferGetInvalidations(ReorderBuffer *rb, TransactionId xid, SharedInvalidationMessage **msgs)
void ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
void ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
TransactionId * ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb)
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
#define IsSpecInsert(action)
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
ReorderBuffer * ReorderBufferAllocate(void)
int logical_decoding_work_mem
static void AssertChangeLsnOrder(ReorderBufferTXN *txn)
static bool ReorderBufferCanStream(ReorderBuffer *rb)
static int ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg)
static void ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid)
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
struct ReorderBufferIterTXNState ReorderBufferIterTXNState
void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
int debug_logical_replication_streaming
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
#define IsInsertOrUpdate(action)
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
struct RewriteMappingFile RewriteMappingFile
void ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
#define CHANGES_THRESHOLD
static ReorderBufferTXN * ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
static bool ReorderBufferCheckAndTruncateAbortedTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *data)
HeapTuple ReorderBufferAllocTupleBuf(ReorderBuffer *rb, Size tuple_len)
void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static void ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
ReorderBufferChange * ReorderBufferAllocChange(ReorderBuffer *rb)
void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
static void SetupCheckXidLive(TransactionId xid)
static bool TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
static void ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
static ReorderBufferTXN * ReorderBufferAllocTXN(ReorderBuffer *rb)
bool ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
static void ReorderBufferFreeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
struct TXNEntryFile TXNEntryFile
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
Oid * ReorderBufferAllocRelids(ReorderBuffer *rb, int nrelids)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, ReorderBufferTXN *txn, bool addition, Size sz)
struct ReorderBufferToastEnt ReorderBufferToastEnt
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
void ReorderBufferFree(ReorderBuffer *rb)
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
#define IsSpecConfirmOrAbort(action)
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
static const Size max_changes_in_memory
void StartupReorderBuffer(void)
void ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
static void ReorderBufferMaybeMarkTXNStreamed(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBufferTXN * ReorderBufferGetOldestTXN(ReorderBuffer *rb)
static void ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)
#define rbtxn_is_committed(txn)
#define rbtxn_has_streamable_change(txn)
#define rbtxn_has_catalog_changes(txn)
@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
@ DEBUG_LOGICAL_REP_STREAMING_BUFFERED
#define RBTXN_PREPARE_STATUS_MASK
#define rbtxn_is_serialized_clear(txn)
#define RBTXN_IS_STREAMED
#define rbtxn_is_prepared(txn)
#define RBTXN_HAS_PARTIAL_CHANGE
#define rbtxn_is_streamed(txn)
struct ReorderBufferChange ReorderBufferChange
#define RBTXN_SENT_PREPARE
#define rbtxn_is_toptxn(txn)
#define rbtxn_get_toptxn(txn)
#define rbtxn_is_known_subxact(txn)
#define rbtxn_is_subtxn(txn)
#define RBTXN_HAS_CATALOG_CHANGES
#define RBTXN_IS_COMMITTED
#define PG_LOGICAL_MAPPINGS_DIR
#define RBTXN_IS_SERIALIZED_CLEAR
#define rbtxn_sent_prepare(txn)
#define RBTXN_IS_PREPARED
#define RBTXN_SKIPPED_PREPARE
#define RBTXN_HAS_STREAMABLE_CHANGE
@ REORDER_BUFFER_CHANGE_INVALIDATION
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
@ REORDER_BUFFER_CHANGE_INSERT
@ REORDER_BUFFER_CHANGE_MESSAGE
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT
@ REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID
@ REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT
@ REORDER_BUFFER_CHANGE_TRUNCATE
@ REORDER_BUFFER_CHANGE_DELETE
@ REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT
@ REORDER_BUFFER_CHANGE_UPDATE
#define rbtxn_is_aborted(txn)
#define RBTXN_IS_SERIALIZED
#define rbtxn_is_serialized(txn)
#define rbtxn_has_partial_change(txn)
#define LOGICAL_REWRITE_FORMAT
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
ReplicationSlot * MyReplicationSlot
bool ReplicationSlotValidateName(const char *name, int elevel)
void SnapBuildSnapDecRefcount(Snapshot snap)
bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
void TeardownHistoricSnapshot(bool is_error)
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
static HTAB * tuplecid_data
struct SnapshotData * Snapshot
struct SnapshotData SnapshotData
struct SnapBuild * snapshot_builder
RelFileLocator old_locator
RelFileLocator new_locator
struct ReorderBufferChange::@110::@112 truncate
ReorderBufferChangeType action
bool clear_toast_afterwards
union ReorderBufferChange::@110 data
struct ReorderBufferTXN * txn
struct ReorderBufferChange::@110::@111 tp
struct ReorderBufferChange::@110::@114 tuplecid
struct ReorderBufferChange::@110::@113 msg
struct ReorderBufferChange::@110::@115 inval
SharedInvalidationMessage * invalidations
ReorderBufferChange change
ReorderBufferChange * change
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
XLogRecPtr restart_decoding_lsn
pairingheap_node txn_node
XLogRecPtr base_snapshot_lsn
TransactionId toplevel_xid
dlist_node catchange_node
SharedInvalidationMessage * invalidations
struct ReorderBufferTXN * toptxn
void * output_plugin_private
dlist_node base_snapshot_node
union ReorderBufferTXN::@116 xact_time
struct varlena * reconstructed
ReorderBufferTupleCidKey key
ReorderBufferStreamMessageCB stream_message
ReorderBufferStreamChangeCB stream_change
ReorderBufferBeginCB begin_prepare
ReorderBufferStreamTruncateCB stream_truncate
ReorderBufferCommitPreparedCB commit_prepared
ReorderBufferUpdateProgressTxnCB update_progress_txn
ReorderBufferMessageCB message
dlist_head txns_by_base_snapshot_lsn
dclist_head catchange_txns
ReorderBufferRollbackPreparedCB rollback_prepared
ReorderBufferPrepareCB prepare
ReorderBufferStreamStopCB stream_stop
ReorderBufferApplyChangeCB apply_change
MemoryContext change_context
ReorderBufferTXN * by_txn_last_txn
TransactionId by_txn_last_xid
ReorderBufferStreamPrepareCB stream_prepare
ReorderBufferStreamAbortCB stream_abort
MemoryContext tup_context
ReorderBufferCommitCB commit
ReorderBufferStreamStartCB stream_start
ReorderBufferStreamCommitCB stream_commit
ReorderBufferApplyTruncateCB apply_truncate
dlist_head toplevel_by_lsn
ReorderBufferBeginCB begin
MemoryContext txn_context
XLogRecPtr current_restart_decoding_lsn
ReplicationSlotPersistentData data
bool TransactionIdDidCommit(TransactionId transactionId)
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
#define InvalidTransactionId
#define TransactionIdEquals(id1, id2)
#define TransactionIdIsValid(xid)
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
#define VARSIZE_SHORT(PTR)
#define VARATT_IS_EXTENDED(PTR)
#define VARATT_IS_SHORT(PTR)
#define SET_VARSIZE_COMPRESSED(PTR, len)
#define SET_VARTAG_EXTERNAL(PTR, tag)
#define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)
#define VARDATA_EXTERNAL(PTR)
#define SET_VARSIZE(PTR, len)
#define VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer)
#define VARATT_IS_EXTERNAL(PTR)
static void pgstat_report_wait_start(uint32 wait_event_info)
static void pgstat_report_wait_end(void)
bool IsTransactionOrTransactionBlock(void)
void BeginInternalSubTransaction(const char *name)
TransactionId CheckXidAlive
void RollbackAndReleaseCurrentSubTransaction(void)
void StartTransactionCommand(void)
TransactionId GetCurrentTransactionIdIfAny(void)
TransactionId GetCurrentTransactionId(void)
void AbortCurrentTransaction(void)
int xidComparator(const void *arg1, const void *arg2)
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr