184#define IsSpecInsert(action) \
186 ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \
188#define IsSpecConfirmOrAbort(action) \
190 (((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) || \
191 ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT)) \
193#define IsInsertOrUpdate(action) \
195 (((action) == REORDER_BUFFER_CHANGE_INSERT) || \
196 ((action) == REORDER_BUFFER_CHANGE_UPDATE) || \
197 ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT)) \
302 bool addition,
Size sz);
325 memset(&hash_ctl, 0,
sizeof(hash_ctl));
454 if (txn->
gid != NULL)
608 alloc_len =
sizeof(
Oid) * nrelids;
632 bool *is_new,
XLogRecPtr lsn,
bool create_as_top)
706 Assert(!create || txn != NULL);
754 toptxn->
txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
765 toptxn->
txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
853 bool transactional,
const char *prefix,
854 Size message_size,
const char *message)
886 volatile Snapshot snapshot_now = snap;
898 rb->
message(rb, txn, lsn,
false, prefix, message_size, message);
922#ifdef USE_ASSERT_CHECKING
955 Assert(prev_first_lsn < cur_txn->first_lsn);
975 Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
993#ifdef USE_ASSERT_CHECKING
1010 Assert(prev_lsn <= cur_change->lsn);
1012 prev_lsn = cur_change->
lsn;
1247 else if (pos_a == pos_b)
1302 state->nr_txns = nr_txns;
1305 for (off = 0; off <
state->nr_txns; off++)
1307 state->entries[off].file.vfd = -1;
1308 state->entries[off].segno = 0;
1317 *iter_state =
state;
1336 &
state->entries[off].segno);
1342 state->entries[off].lsn = cur_change->
lsn;
1343 state->entries[off].change = cur_change;
1344 state->entries[off].txn = txn;
1365 &
state->entries[off].file,
1366 &
state->entries[off].segno);
1371 state->entries[off].lsn = cur_change->
lsn;
1372 state->entries[off].change = cur_change;
1373 state->entries[off].txn = cur_txn;
1397 if (
state->heap->bh_size == 0)
1401 entry = &
state->entries[off];
1427 state->entries[off].lsn = next_change->
lsn;
1428 state->entries[off].change = next_change;
1451 &
state->entries[off].segno))
1458 elog(
DEBUG2,
"restored %u/%u changes from disk",
1464 state->entries[off].lsn = next_change->
lsn;
1465 state->entries[off].change = next_change;
1487 for (off = 0; off <
state->nr_txns; off++)
1489 if (
state->entries[off].file.vfd != -1)
2009 int nrelations,
Relation *relations,
2048 if (snapshot_now->
copied)
2076 if (specinsert != NULL)
2120 volatile bool stream_started =
false;
2144 int changes_count = 0;
2182 stream_started =
true;
2193 prev_lsn = change->
lsn;
2202 curtxn = change->
txn;
2215 if (specinsert == NULL)
2216 elog(
ERROR,
"invalid ordering of speculative insertion changes");
2218 change = specinsert;
2247 elog(
ERROR,
"could not map filenumber \"%s\" to relation OID",
2254 elog(
ERROR,
"could not open relation with OID %u (for filenumber \"%s\")",
2275 if (relation->
rd_rel->relkind == RELKIND_SEQUENCE)
2317 if (specinsert != NULL)
2346 if (specinsert != NULL)
2354 specinsert = change;
2367 if (specinsert != NULL)
2391 for (
i = 0;
i < nrelids;
i++)
2399 elog(
ERROR,
"could not open relation with OID %u", relid);
2404 relations[nrelations++] = rel;
2412 for (
i = 0;
i < nrelations;
i++)
2432 if (snapshot_now->
copied)
2463 if (command_id < change->
data.command_id)
2467 if (!snapshot_now->
copied)
2474 snapshot_now->
curcid = command_id;
2483 elog(
ERROR,
"tuplecid value in changequeue");
2497#define CHANGES_THRESHOLD 100
2536 stream_started =
false;
2546 rb->
prepare(rb, txn, commit_lsn);
2548 rb->
commit(rb, txn, commit_lsn);
2553 elog(
ERROR,
"output plugin used XID %u",
2562 else if (snapshot_now->
copied)
2641 if (errdata->
sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2655 command_id, prev_lsn,
2755 origin_id, origin_lsn);
2850 XLogRecPtr origin_lsn,
char *gid,
bool is_commit)
2866 prepare_end_lsn = txn->
end_lsn;
2880 if ((txn->
final_lsn < two_phase_at) && is_commit)
3113 for (
i = 0;
i < ninvalidations;
i++)
3224 bool addition,
Size sz)
3392 for (
i = 0;
i < nmsgs;
i++)
3459 xids[xcnt++] = txn->
xid;
3596 Size largest_size = 0;
3611 if ((largest == NULL || txn->
total_size > largest_size) &&
3718 elog(
DEBUG2,
"spill %u changes in XID %u to disk",
3760 O_CREAT | O_WRONLY | O_APPEND |
PG_BINARY);
3765 errmsg(
"could not open file \"%s\": %m", path)));
3835 oldlen = oldtup->
t_len;
3842 newlen = newtup->
t_len;
3887 memcpy(
data, &prefix_size,
sizeof(
Size));
3891 data += prefix_size;
3990 int save_errno = errno;
3995 errno = save_errno ? save_errno : ENOSPC;
3998 errmsg(
"could not write to data file for XID %u: %m",
4059 bool txn_is_streamed;
4194 oldlen = oldtup->
t_len;
4201 newlen = newtup->
t_len;
4310 if (*
fd < 0 && errno == ENOENT)
4319 errmsg(
"could not open file \"%s\": %m",
4331 file->
curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
4341 else if (readBytes < 0)
4344 errmsg(
"could not read from reorderbuffer spill file: %m")));
4348 errmsg(
"could not read from reorderbuffer spill file: read %d instead of %u bytes",
4364 WAIT_EVENT_REORDER_BUFFER_READ);
4369 errmsg(
"could not read from reorderbuffer spill file: %m")));
4373 errmsg(
"could not read from reorderbuffer spill file: read %d instead of %u bytes",
4474 memcpy(&prefix_size,
data,
sizeof(
Size));
4480 data += prefix_size;
4585 if (unlink(path) != 0 && errno != ENOENT)
4588 errmsg(
"could not remove file \"%s\": %m", path)));
4601 struct stat statbuf;
4614 if (strncmp(spill_de->
d_name,
"xid", 3) == 0)
4620 if (unlink(path) != 0)
4623 errmsg(
"could not remove file \"%s\" during removal of %s/%s/xid*: %m",
4657 struct dirent *logical_de;
4662 if (strcmp(logical_de->
d_name,
".") == 0 ||
4663 strcmp(logical_de->
d_name,
"..") == 0)
4745 elog(
ERROR,
"got sequence entry %d for toast chunk %u instead of seq 0",
4746 chunk_seq, chunk_id);
4749 elog(
ERROR,
"got sequence entry %d for toast chunk %u instead of seq %d",
4762 elog(
ERROR,
"unexpected type of toast chunk");
4764 ent->
size += chunksize;
4831 elog(
ERROR,
"could not open toast relation with OID %u (base relation \"%s\")",
4845 for (natt = 0; natt < desc->
natts; natt++)
4854 struct varlena *new_datum = NULL;
4855 struct varlena *reconstructed;
4860 if (attr->attnum < 0)
4863 if (attr->attisdropped)
4867 if (attr->attlen != -1)
4919 memcpy(
VARDATA(reconstructed) + data_done,
4932 memset(&redirect_pointer, 0,
sizeof(redirect_pointer));
4933 redirect_pointer.
pointer = reconstructed;
4937 sizeof(redirect_pointer));
4960 for (natt = 0; natt < desc->
natts; natt++)
5061 elog(
DEBUG3,
"mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
5093 errmsg(
"could not open file \"%s\": %m", path)));
5113 errmsg(
"could not read file \"%s\": %m",
5115 else if (readBytes == 0)
5120 errmsg(
"could not read from file \"%s\": read %d instead of %d bytes",
5165 errmsg(
"could not close file \"%s\": %m", path)));
5175 return bsearch(&xid, xip, num,