181 #define IsSpecInsert(action) \
183 ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \
185 #define IsSpecConfirmOrAbort(action) \
187 (((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) || \
188 ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT)) \
190 #define IsInsertOrUpdate(action) \
192 (((action) == REORDER_BUFFER_CHANGE_INSERT) || \
193 ((action) == REORDER_BUFFER_CHANGE_UPDATE) || \
194 ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT)) \
294 bool addition,
Size sz);
317 memset(&hash_ctl, 0,
sizeof(hash_ctl));
437 if (txn->
gid != NULL)
495 if (change->
data.
tp.newtuple)
498 change->
data.
tp.newtuple = NULL;
501 if (change->
data.
tp.oldtuple)
504 change->
data.
tp.oldtuple = NULL;
508 if (change->
data.
msg.prefix != NULL)
510 change->
data.
msg.prefix = NULL;
511 if (change->
data.
msg.message != NULL)
513 change->
data.
msg.message = NULL;
560 MAXIMUM_ALIGNOF + alloc_len);
591 alloc_len =
sizeof(
Oid) * nrelids;
615 bool *is_new,
XLogRecPtr lsn,
bool create_as_top)
689 Assert(!create || txn != NULL);
739 change->
data.
tp.clear_toast_afterwards)
821 bool transactional,
const char *prefix,
822 Size message_size,
const char *message)
836 change->
data.
msg.message_size = message_size;
838 memcpy(change->
data.
msg.message, message, message_size);
847 volatile Snapshot snapshot_now = snapshot;
856 rb->
message(rb, txn, lsn,
false, prefix, message_size, message);
880 #ifdef USE_ASSERT_CHECKING
899 Assert(prev_first_lsn < cur_txn->first_lsn);
919 Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
937 #ifdef USE_ASSERT_CHECKING
954 Assert(prev_lsn <= cur_change->lsn);
956 prev_lsn = cur_change->
lsn;
1191 else if (pos_a == pos_b)
1246 state->nr_txns = nr_txns;
1249 for (off = 0; off <
state->nr_txns; off++)
1251 state->entries[off].file.vfd = -1;
1252 state->entries[off].segno = 0;
1261 *iter_state =
state;
1280 &
state->entries[off].segno);
1286 state->entries[off].lsn = cur_change->
lsn;
1287 state->entries[off].change = cur_change;
1288 state->entries[off].txn = txn;
1309 &
state->entries[off].file,
1310 &
state->entries[off].segno);
1315 state->entries[off].lsn = cur_change->
lsn;
1316 state->entries[off].change = cur_change;
1317 state->entries[off].txn = cur_txn;
1341 if (
state->heap->bh_size == 0)
1345 entry = &
state->entries[off];
1371 state->entries[off].lsn = next_change->
lsn;
1372 state->entries[off].change = next_change;
1395 &
state->entries[off].segno))
1402 elog(
DEBUG2,
"restored %u/%u changes from disk",
1408 state->entries[off].lsn = next_change->
lsn;
1409 state->entries[off].change = next_change;
1431 for (off = 0; off <
state->nr_txns; off++)
1433 if (
state->entries[off].file.vfd != -1)
1932 int nrelations,
Relation *relations,
1951 change->
data.
msg.message_size,
1956 change->
data.
msg.message_size,
1971 if (snapshot_now->
copied)
1999 if (specinsert != NULL)
2040 volatile bool stream_started =
false;
2098 stream_started =
true;
2109 prev_lsn = change->
lsn;
2118 curtxn = change->
txn;
2131 if (specinsert == NULL)
2132 elog(
ERROR,
"invalid ordering of speculative insertion changes");
2134 change = specinsert;
2144 change->
data.
tp.relnode.relNode);
2159 change->
data.
tp.newtuple == NULL &&
2160 change->
data.
tp.oldtuple == NULL)
2163 elog(
ERROR,
"could not map filenode \"%s\" to relation OID",
2170 elog(
ERROR,
"could not open relation with OID %u (for filenode \"%s\")",
2191 if (relation->
rd_rel->relkind == RELKIND_SEQUENCE)
2206 if (change->
data.
tp.clear_toast_afterwards)
2233 if (specinsert != NULL)
2262 if (specinsert != NULL)
2270 specinsert = change;
2283 if (specinsert != NULL)
2307 for (
i = 0;
i < nrelids;
i++)
2315 elog(
ERROR,
"could not open relation with OID %u", relid);
2320 relations[nrelations++] = relation;
2328 for (
i = 0;
i < nrelations;
i++)
2348 if (snapshot_now->
copied)
2379 if (command_id < change->
data.command_id)
2383 if (!snapshot_now->
copied)
2390 snapshot_now->
curcid = command_id;
2399 elog(
ERROR,
"tuplecid value in changequeue");
2434 stream_started =
false;
2444 rb->
prepare(rb, txn, commit_lsn);
2446 rb->
commit(rb, txn, commit_lsn);
2451 elog(
ERROR,
"output plugin used XID %u",
2460 else if (snapshot_now->
copied)
2539 if (errdata->
sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2553 command_id, prev_lsn,
2653 origin_id, origin_lsn);
2748 XLogRecPtr origin_lsn,
char *gid,
bool is_commit)
2764 prepare_end_lsn = txn->
end_lsn;
2778 if ((txn->
final_lsn < two_phase_at) && is_commit)
3005 for (
i = 0;
i < ninvalidations;
i++)
3112 bool addition,
Size sz)
3246 change->
data.
inval.ninvalidations = nmsgs;
3249 memcpy(change->
data.
inval.invalidations, msgs,
3266 for (
i = 0;
i < nmsgs;
i++)
3383 if ((!largest) || (txn->
size > largest->
size))
3421 Size largest_size = 0;
3436 if ((largest == NULL || txn->
total_size > largest_size) &&
3532 elog(
DEBUG2,
"spill %u changes in XID %u to disk",
3574 O_CREAT | O_WRONLY | O_APPEND |
PG_BINARY);
3579 errmsg(
"could not open file \"%s\": %m", path)));
3640 oldtup = change->
data.
tp.oldtuple;
3641 newtup = change->
data.
tp.newtuple;
3686 Size prefix_size = strlen(change->
data.
msg.prefix) + 1;
3688 sz += prefix_size + change->
data.
msg.message_size +
3698 memcpy(
data, &prefix_size,
sizeof(
Size));
3702 data += prefix_size;
3708 change->
data.
msg.message_size);
3801 int save_errno = errno;
3806 errno = save_errno ? save_errno : ENOSPC;
3809 errmsg(
"could not write to data file for XID %u: %m",
3870 bool txn_is_streamed;
3999 oldtup = change->
data.
tp.oldtuple;
4000 newtup = change->
data.
tp.newtuple;
4020 Size prefix_size = strlen(change->
data.
msg.prefix) + 1;
4022 sz += prefix_size + change->
data.
msg.message_size +
4119 if (*
fd < 0 && errno == ENOENT)
4128 errmsg(
"could not open file \"%s\": %m",
4150 else if (readBytes < 0)
4153 errmsg(
"could not read from reorderbuffer spill file: %m")));
4157 errmsg(
"could not read from reorderbuffer spill file: read %d instead of %u bytes",
4178 errmsg(
"could not read from reorderbuffer spill file: %m")));
4182 errmsg(
"could not read from reorderbuffer spill file: read %d instead of %u bytes",
4231 if (change->
data.
tp.oldtuple)
4235 change->
data.
tp.oldtuple =
4239 memcpy(&change->
data.
tp.oldtuple->tuple,
data,
4244 change->
data.
tp.oldtuple->tuple.t_data =
4248 memcpy(change->
data.
tp.oldtuple->tuple.t_data,
data, tuplelen);
4252 if (change->
data.
tp.newtuple)
4260 change->
data.
tp.newtuple =
4264 memcpy(&change->
data.
tp.newtuple->tuple,
data,
4269 change->
data.
tp.newtuple->tuple.t_data =
4273 memcpy(change->
data.
tp.newtuple->tuple.t_data,
data, tuplelen);
4283 memcpy(&prefix_size,
data,
sizeof(
Size));
4287 memcpy(change->
data.
msg.prefix,
data, prefix_size);
4289 data += prefix_size;
4295 change->
data.
msg.message_size);
4297 change->
data.
msg.message_size);
4331 memcpy(newsnap,
data, size);
4394 if (unlink(path) != 0 && errno != ENOENT)
4397 errmsg(
"could not remove file \"%s\": %m", path)));
4410 struct stat statbuf;
4413 sprintf(path,
"pg_replslot/%s", slotname);
4423 if (strncmp(spill_de->
d_name,
"xid", 3) == 0)
4426 "pg_replslot/%s/%s", slotname,
4429 if (unlink(path) != 0)
4432 errmsg(
"could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
4465 struct dirent *logical_de;
4468 while ((logical_de =
ReadDir(logical_dir,
"pg_replslot")) != NULL)
4470 if (strcmp(logical_de->
d_name,
".") == 0 ||
4471 strcmp(logical_de->
d_name,
"..") == 0)
4534 newtup = change->
data.
tp.newtuple;
4556 elog(
ERROR,
"got sequence entry %d for toast chunk %u instead of seq 0",
4557 chunk_seq, chunk_id);
4560 elog(
ERROR,
"got sequence entry %d for toast chunk %u instead of seq %d",
4573 elog(
ERROR,
"unexpected type of toast chunk");
4575 ent->
size += chunksize;
4642 elog(
ERROR,
"could not open toast relation with OID %u (base relation \"%s\")",
4652 newtup = change->
data.
tp.newtuple;
4656 for (natt = 0; natt < desc->
natts; natt++)
4665 struct varlena *new_datum = NULL;
4666 struct varlena *reconstructed;
4671 if (attr->attnum < 0)
4674 if (attr->attisdropped)
4678 if (attr->attlen != -1)
4723 ctup = cchange->
data.
tp.newtuple;
4730 memcpy(
VARDATA(reconstructed) + data_done,
4743 memset(&redirect_pointer, 0,
sizeof(redirect_pointer));
4744 redirect_pointer.
pointer = reconstructed;
4748 sizeof(redirect_pointer));
4771 for (natt = 0; natt < desc->
natts; natt++)
4872 elog(
DEBUG3,
"mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
4899 sprintf(path,
"pg_logical/mappings/%s", fname);
4904 errmsg(
"could not open file \"%s\": %m", path)));
4924 errmsg(
"could not read file \"%s\": %m",
4926 else if (readBytes == 0)
4931 errmsg(
"could not read from file \"%s\": read %d instead of %d bytes",
4982 errmsg(
"could not close file \"%s\": %m", path)));
4992 return bsearch(&xid, xip, num,
5005 if (
a->lsn <
b->lsn)
5007 else if (
a->lsn >
b->lsn)
5020 struct dirent *mapping_de;
5026 while ((mapping_de =
ReadDir(mapping_dir,
"pg_logical/mappings")) != NULL)
5037 if (strcmp(mapping_de->
d_name,
".") == 0 ||
5038 strcmp(mapping_de->
d_name,
"..") == 0)
5042 if (strncmp(mapping_de->
d_name,
"map-", 4) != 0)
5046 &f_dboid, &f_relid, &f_hi, &f_lo,
5047 &f_mapped_xid, &f_create_xid) != 6)
5050 f_lsn = ((uint64) f_hi) << 32 | f_lo;
5053 if (f_dboid != dboid)
5057 if (f_relid != relid)
5079 foreach(file, files)
5104 bool updated_mapping =
false;
5118 memset(&
key, 0,
sizeof(
key));
5148 if (ent == NULL && !updated_mapping)
5152 updated_mapping =
true;
5155 else if (ent == NULL)
void binaryheap_build(binaryheap *heap)
void binaryheap_add_unordered(binaryheap *heap, Datum d)
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Datum binaryheap_remove_first(binaryheap *heap)
void binaryheap_free(binaryheap *heap)
void binaryheap_replace_first(binaryheap *heap, Datum d)
Datum binaryheap_first(binaryheap *heap)
static void cleanup(void)
#define BufferIsLocal(buffer)
void BufferGetTag(Buffer buffer, RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
#define offsetof(type, field)
#define FLEXIBLE_ARRAY_MEMBER
#define AssertArg(condition)
bool IsToastRelation(Relation relation)
bool IsSharedRelation(Oid relationId)
elog(ERROR, "%s: %s", p2, msg)
#define INDIRECT_POINTER_SIZE
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
void hash_destroy(HTAB *hashp)
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
void * hash_seq_search(HASH_SEQ_STATUS *status)
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
void FreeErrorData(ErrorData *edata)
int errcode_for_file_access(void)
void FlushErrorState(void)
int errmsg(const char *fmt,...)
ErrorData * CopyErrorData(void)
#define ereport(elevel,...)
struct dirent * ReadDir(DIR *dir, const char *dirname)
int CloseTransientFile(int fd)
void FileClose(File file)
File PathNameOpenFile(const char *fileName, int fileFlags)
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
int FileRead(File file, char *buffer, int amount, off_t offset, uint32 wait_event_info)
int OpenTransientFile(const char *fileName, int fileFlags)
DIR * AllocateDir(const char *dirname)
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
HeapTupleData * HeapTuple
struct HeapTupleData HeapTupleData
#define SizeofHeapTupleHeader
static Datum fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)
#define dlist_foreach(iter, lhead)
static void dlist_init(dlist_head *head)
static void dlist_insert_before(dlist_node *before, dlist_node *node)
#define dlist_head_element(type, membername, lhead)
static bool dlist_is_empty(dlist_head *head)
static void dlist_delete(dlist_node *node)
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
static dlist_node * dlist_pop_head_node(dlist_head *head)
static bool dlist_has_next(dlist_head *head, dlist_node *node)
#define dlist_foreach_modify(iter, lhead)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
#define dlist_container(type, membername, ptr)
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
if(TABLE==NULL||TABLE_index==NULL)
#define ItemPointerGetBlockNumber(pointer)
#define ItemPointerCopy(fromPointer, toPointer)
#define ItemPointerGetOffsetNumber(pointer)
Assert(fmt[strlen(fmt) - 1] !='\n')
void list_sort(List *list, list_sort_comparator cmp)
List * lappend(List *list, void *datum)
void UpdateDecodingStats(LogicalDecodingContext *ctx)
char * pstrdup(const char *in)
void pfree(void *pointer)
void * palloc0(Size size)
void * MemoryContextAllocZero(MemoryContext context, Size size)
MemoryContext CurrentMemoryContext
void * repalloc(void *pointer, Size size)
void * MemoryContextAlloc(MemoryContext context, Size size)
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define SLAB_DEFAULT_BLOCK_SIZE
#define SLAB_LARGE_BLOCK_SIZE
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
FormData_pg_attribute * Form_pg_attribute
#define qsort(a, b, c, d)
#define VARSIZE_SHORT(PTR)
#define DatumGetObjectId(X)
#define VARATT_IS_EXTENDED(PTR)
#define VARATT_IS_SHORT(PTR)
#define SET_VARSIZE_COMPRESSED(PTR, len)
#define DatumGetPointer(X)
#define SET_VARTAG_EXTERNAL(PTR, tag)
#define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)
#define VARDATA_EXTERNAL(PTR)
#define SET_VARSIZE(PTR, len)
#define VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer)
#define VARATT_IS_EXTERNAL(PTR)
#define PointerGetDatum(X)
static int fd(const char *x, int i)
#define RelationIsLogicallyLogged(relation)
#define RelationGetDescr(relation)
#define RelationGetRelationName(relation)
#define RelationIsValid(relation)
Relation RelationIdGetRelation(Oid relationId)
void RelationClose(Relation relation)
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
#define relpathperm(rnode, forknum)
static int file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)
static void ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
struct ReorderBufferDiskChange ReorderBufferDiskChange
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
void ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb)
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
bool ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid)
ReorderBuffer * ReorderBufferAllocate(void)
void ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
void ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
#define IsSpecInsert(action)
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
int logical_decoding_work_mem
static void AssertChangeLsnOrder(ReorderBufferTXN *txn)
static bool ReorderBufferCanStream(ReorderBuffer *rb)
static void ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid)
Oid * ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
struct ReorderBufferIterTXNState ReorderBufferIterTXNState
void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
ReorderBufferTXN * ReorderBufferGetOldestTXN(ReorderBuffer *rb)
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define IsInsertOrUpdate(action)
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
struct RewriteMappingFile RewriteMappingFile
void ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
static ReorderBufferTXN * ReorderBufferLargestTopTXN(ReorderBuffer *rb)
void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)
static void AssertTXNLsnOrder(ReorderBuffer *rb)
void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
static void ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
static void SetupCheckXidLive(TransactionId xid)
static bool TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)