184 #define IsSpecInsert(action) \
186 ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \
188 #define IsSpecConfirmOrAbort(action) \
190 (((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) || \
191 ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT)) \
193 #define IsInsertOrUpdate(action) \
195 (((action) == REORDER_BUFFER_CHANGE_INSERT) || \
196 ((action) == REORDER_BUFFER_CHANGE_UPDATE) || \
197 ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT)) \
302 bool addition,
Size sz);
325 memset(&hash_ctl, 0,
sizeof(hash_ctl));
449 if (txn->
gid != NULL)
510 if (change->
data.
tp.newtuple)
513 change->
data.
tp.newtuple = NULL;
516 if (change->
data.
tp.oldtuple)
519 change->
data.
tp.oldtuple = NULL;
523 if (change->
data.
msg.prefix != NULL)
525 change->
data.
msg.prefix = NULL;
526 if (change->
data.
msg.message != NULL)
528 change->
data.
msg.message = NULL;
603 alloc_len =
sizeof(
Oid) * nrelids;
627 bool *is_new,
XLogRecPtr lsn,
bool create_as_top)
701 Assert(!create || txn != NULL);
748 change->
data.
tp.clear_toast_afterwards)
848 bool transactional,
const char *prefix,
849 Size message_size,
const char *message)
870 change->
data.
msg.message_size = message_size;
872 memcpy(change->
data.
msg.message, message, message_size);
881 volatile Snapshot snapshot_now = snap;
893 rb->
message(rb, txn, lsn,
false, prefix, message_size, message);
917 #ifdef USE_ASSERT_CHECKING
950 Assert(prev_first_lsn < cur_txn->first_lsn);
970 Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
988 #ifdef USE_ASSERT_CHECKING
1005 Assert(prev_lsn <= cur_change->lsn);
1007 prev_lsn = cur_change->
lsn;
1242 else if (pos_a == pos_b)
1297 state->nr_txns = nr_txns;
1300 for (off = 0; off <
state->nr_txns; off++)
1302 state->entries[off].file.vfd = -1;
1303 state->entries[off].segno = 0;
1312 *iter_state =
state;
1331 &
state->entries[off].segno);
1337 state->entries[off].lsn = cur_change->
lsn;
1338 state->entries[off].change = cur_change;
1339 state->entries[off].txn = txn;
1360 &
state->entries[off].file,
1361 &
state->entries[off].segno);
1366 state->entries[off].lsn = cur_change->
lsn;
1367 state->entries[off].change = cur_change;
1368 state->entries[off].txn = cur_txn;
1392 if (
state->heap->bh_size == 0)
1396 entry = &
state->entries[off];
1422 state->entries[off].lsn = next_change->
lsn;
1423 state->entries[off].change = next_change;
1446 &
state->entries[off].segno))
1453 elog(
DEBUG2,
"restored %u/%u changes from disk",
1459 state->entries[off].lsn = next_change->
lsn;
1460 state->entries[off].change = next_change;
1482 for (off = 0; off <
state->nr_txns; off++)
1484 if (
state->entries[off].file.vfd != -1)
2004 int nrelations,
Relation *relations,
2023 change->
data.
msg.message_size,
2028 change->
data.
msg.message_size,
2043 if (snapshot_now->
copied)
2071 if (specinsert != NULL)
2115 volatile bool stream_started =
false;
2139 int changes_count = 0;
2177 stream_started =
true;
2188 prev_lsn = change->
lsn;
2197 curtxn = change->
txn;
2210 if (specinsert == NULL)
2211 elog(
ERROR,
"invalid ordering of speculative insertion changes");
2213 change = specinsert;
2223 change->
data.
tp.rlocator.relNumber);
2238 change->
data.
tp.newtuple == NULL &&
2239 change->
data.
tp.oldtuple == NULL)
2242 elog(
ERROR,
"could not map filenumber \"%s\" to relation OID",
2249 elog(
ERROR,
"could not open relation with OID %u (for filenumber \"%s\")",
2270 if (relation->
rd_rel->relkind == RELKIND_SEQUENCE)
2285 if (change->
data.
tp.clear_toast_afterwards)
2312 if (specinsert != NULL)
2341 if (specinsert != NULL)
2349 specinsert = change;
2362 if (specinsert != NULL)
2386 for (
i = 0;
i < nrelids;
i++)
2394 elog(
ERROR,
"could not open relation with OID %u", relid);
2399 relations[nrelations++] = rel;
2407 for (
i = 0;
i < nrelations;
i++)
2427 if (snapshot_now->
copied)
2458 if (command_id < change->
data.command_id)
2462 if (!snapshot_now->
copied)
2469 snapshot_now->
curcid = command_id;
2478 elog(
ERROR,
"tuplecid value in changequeue");
2492 #define CHANGES_THRESHOLD 100
2531 stream_started =
false;
2541 rb->
prepare(rb, txn, commit_lsn);
2543 rb->
commit(rb, txn, commit_lsn);
2548 elog(
ERROR,
"output plugin used XID %u",
2557 else if (snapshot_now->
copied)
2636 if (errdata->
sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2650 command_id, prev_lsn,
2750 origin_id, origin_lsn);
2845 XLogRecPtr origin_lsn,
char *gid,
bool is_commit)
2861 prepare_end_lsn = txn->
end_lsn;
2875 if ((txn->
final_lsn < two_phase_at) && is_commit)
3108 for (
i = 0;
i < ninvalidations;
i++)
3219 bool addition,
Size sz)
3367 change->
data.
inval.ninvalidations = nmsgs;
3370 memcpy(change->
data.
inval.invalidations, msgs,
3387 for (
i = 0;
i < nmsgs;
i++)
3454 xids[xcnt++] = txn->
xid;
3591 Size largest_size = 0;
3606 if ((largest == NULL || txn->
total_size > largest_size) &&
3714 elog(
DEBUG2,
"spill %u changes in XID %u to disk",
3756 O_CREAT | O_WRONLY | O_APPEND |
PG_BINARY);
3761 errmsg(
"could not open file \"%s\": %m", path)));
3825 oldtup = change->
data.
tp.oldtuple;
3826 newtup = change->
data.
tp.newtuple;
3831 oldlen = oldtup->
t_len;
3838 newlen = newtup->
t_len;
3871 Size prefix_size = strlen(change->
data.
msg.prefix) + 1;
3873 sz += prefix_size + change->
data.
msg.message_size +
3883 memcpy(
data, &prefix_size,
sizeof(
Size));
3887 data += prefix_size;
3893 change->
data.
msg.message_size);
3986 int save_errno = errno;
3991 errno = save_errno ? save_errno : ENOSPC;
3994 errmsg(
"could not write to data file for XID %u: %m",
4055 bool txn_is_streamed;
4184 oldtup = change->
data.
tp.oldtuple;
4185 newtup = change->
data.
tp.newtuple;
4190 oldlen = oldtup->
t_len;
4197 newlen = newtup->
t_len;
4205 Size prefix_size = strlen(change->
data.
msg.prefix) + 1;
4207 sz += prefix_size + change->
data.
msg.message_size +
4306 if (*
fd < 0 && errno == ENOENT)
4315 errmsg(
"could not open file \"%s\": %m",
4327 file->
curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
4337 else if (readBytes < 0)
4340 errmsg(
"could not read from reorderbuffer spill file: %m")));
4344 errmsg(
"could not read from reorderbuffer spill file: read %d instead of %u bytes",
4360 WAIT_EVENT_REORDER_BUFFER_READ);
4365 errmsg(
"could not read from reorderbuffer spill file: %m")));
4369 errmsg(
"could not read from reorderbuffer spill file: read %d instead of %u bytes",
4418 if (change->
data.
tp.oldtuple)
4422 change->
data.
tp.oldtuple =
4431 change->
data.
tp.oldtuple->t_data =
4435 memcpy(change->
data.
tp.oldtuple->t_data,
data, tuplelen);
4439 if (change->
data.
tp.newtuple)
4447 change->
data.
tp.newtuple =
4456 change->
data.
tp.newtuple->t_data =
4460 memcpy(change->
data.
tp.newtuple->t_data,
data, tuplelen);
4470 memcpy(&prefix_size,
data,
sizeof(
Size));
4474 memcpy(change->
data.
msg.prefix,
data, prefix_size);
4476 data += prefix_size;
4482 change->
data.
msg.message_size);
4484 change->
data.
msg.message_size);
4581 if (unlink(path) != 0 && errno != ENOENT)
4584 errmsg(
"could not remove file \"%s\": %m", path)));
4597 struct stat statbuf;
4610 if (strncmp(spill_de->
d_name,
"xid", 3) == 0)
4616 if (unlink(path) != 0)
4619 errmsg(
"could not remove file \"%s\" during removal of %s/%s/xid*: %m",
4653 struct dirent *logical_de;
4658 if (strcmp(logical_de->
d_name,
".") == 0 ||
4659 strcmp(logical_de->
d_name,
"..") == 0)
4722 newtup = change->
data.
tp.newtuple;
4741 elog(
ERROR,
"got sequence entry %d for toast chunk %u instead of seq 0",
4742 chunk_seq, chunk_id);
4745 elog(
ERROR,
"got sequence entry %d for toast chunk %u instead of seq %d",
4758 elog(
ERROR,
"unexpected type of toast chunk");
4760 ent->
size += chunksize;
4827 elog(
ERROR,
"could not open toast relation with OID %u (base relation \"%s\")",
4837 newtup = change->
data.
tp.newtuple;
4841 for (natt = 0; natt < desc->
natts; natt++)
4850 struct varlena *new_datum = NULL;
4851 struct varlena *reconstructed;
4856 if (attr->attnum < 0)
4859 if (attr->attisdropped)
4863 if (attr->attlen != -1)
4908 ctup = cchange->
data.
tp.newtuple;
4915 memcpy(
VARDATA(reconstructed) + data_done,
4928 memset(&redirect_pointer, 0,
sizeof(redirect_pointer));
4929 redirect_pointer.
pointer = reconstructed;
4933 sizeof(redirect_pointer));
4956 for (natt = 0; natt < desc->
natts; natt++)
5057 elog(
DEBUG3,
"mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
5089 errmsg(
"could not open file \"%s\": %m", path)));
5109 errmsg(
"could not read file \"%s\": %m",
5111 else if (readBytes == 0)
5116 errmsg(
"could not read from file \"%s\": read %d instead of %d bytes",
5161 errmsg(
"could not close file \"%s\": %m", path)));
5171 return bsearch(&xid, xip, num,
5195 struct dirent *mapping_de;
5212 if (strcmp(mapping_de->
d_name,
".") == 0 ||
5213 strcmp(mapping_de->
d_name,
"..") == 0)
5217 if (strncmp(mapping_de->
d_name,
"map-", 4) != 0)
5221 &f_dboid, &f_relid, &f_hi, &f_lo,
5222 &f_mapped_xid, &f_create_xid) != 6)
5225 f_lsn = ((uint64) f_hi) << 32 | f_lo;
5228 if (f_dboid != dboid)
5232 if (f_relid != relid)
5254 foreach(file, files)
5279 bool updated_mapping =
false;
5293 memset(&
key, 0,
sizeof(
key));
5320 if (ent == NULL && !updated_mapping)
5324 updated_mapping =
true;
5327 else if (ent == NULL)
void binaryheap_build(binaryheap *heap)
void binaryheap_replace_first(binaryheap *heap, bh_node_type d)
bh_node_type binaryheap_first(binaryheap *heap)
bh_node_type binaryheap_remove_first(binaryheap *heap)
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
void binaryheap_free(binaryheap *heap)
void binaryheap_add_unordered(binaryheap *heap, bh_node_type d)
static void cleanup(void)
#define BufferIsLocal(buffer)
void BufferGetTag(Buffer buffer, RelFileLocator *rlocator, ForkNumber *forknum, BlockNumber *blknum)
#define Assert(condition)
#define FLEXIBLE_ARRAY_MEMBER
bool IsToastRelation(Relation relation)
bool IsSharedRelation(Oid relationId)
#define INDIRECT_POINTER_SIZE
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
void hash_destroy(HTAB *hashp)
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
void * hash_seq_search(HASH_SEQ_STATUS *status)
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
void FreeErrorData(ErrorData *edata)
int errcode_for_file_access(void)
void FlushErrorState(void)
int errmsg(const char *fmt,...)
ErrorData * CopyErrorData(void)
#define ereport(elevel,...)
struct dirent * ReadDir(DIR *dir, const char *dirname)
int CloseTransientFile(int fd)
void FileClose(File file)
File PathNameOpenFile(const char *fileName, int fileFlags)
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
int OpenTransientFile(const char *fileName, int fileFlags)
DIR * AllocateDir(const char *dirname)
static ssize_t FileRead(File file, void *buffer, size_t amount, off_t offset, uint32 wait_event_info)
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
HeapTupleData * HeapTuple
struct HeapTupleData HeapTupleData
HeapTupleHeaderData * HeapTupleHeader
#define SizeofHeapTupleHeader
static Datum fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)
#define dlist_foreach(iter, lhead)
static void dlist_init(dlist_head *head)
#define dclist_container(type, membername, ptr)
static bool dlist_has_next(const dlist_head *head, const dlist_node *node)
static void dclist_push_tail(dclist_head *head, dlist_node *node)
static void dlist_insert_before(dlist_node *before, dlist_node *node)
#define dlist_head_element(type, membername, lhead)
static void dlist_delete(dlist_node *node)
static uint32 dclist_count(const dclist_head *head)
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
static dlist_node * dlist_pop_head_node(dlist_head *head)
#define dlist_foreach_modify(iter, lhead)
static bool dlist_is_empty(const dlist_head *head)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
static void dclist_delete_from(dclist_head *head, dlist_node *node)
static void dclist_init(dclist_head *head)
#define dlist_container(type, membername, ptr)
#define dclist_foreach(iter, lhead)
static int pg_cmp_u64(uint64 a, uint64 b)
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
if(TABLE==NULL||TABLE_index==NULL)
static OffsetNumber ItemPointerGetOffsetNumber(const ItemPointerData *pointer)
static BlockNumber ItemPointerGetBlockNumber(const ItemPointerData *pointer)
static void ItemPointerCopy(const ItemPointerData *fromPointer, ItemPointerData *toPointer)
void list_sort(List *list, list_sort_comparator cmp)
List * lappend(List *list, void *datum)
void UpdateDecodingStats(LogicalDecodingContext *ctx)
char * pstrdup(const char *in)
void pfree(void *pointer)
void * palloc0(Size size)
void * MemoryContextAllocZero(MemoryContext context, Size size)
MemoryContext CurrentMemoryContext
void * repalloc(void *pointer, Size size)
void * MemoryContextAlloc(MemoryContext context, Size size)
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define SLAB_DEFAULT_BLOCK_SIZE
#define SLAB_LARGE_BLOCK_SIZE
#define CHECK_FOR_INTERRUPTS()
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
pairingheap_node * pairingheap_first(pairingheap *heap)
pairingheap * pairingheap_allocate(pairingheap_comparator compare, void *arg)
#define pairingheap_container(type, membername, ptr)
#define pairingheap_const_container(type, membername, ptr)
FormData_pg_attribute * Form_pg_attribute
#define qsort(a, b, c, d)