57 int segsize, const
char *waldir);
60 #define MAX_ERRORMSG_LEN 1000
66 #define DEFAULT_DECODE_BUFFER_SIZE (64 * 1024)
83 state->errormsg_deferred =
true;
96 state->decode_buffer = buffer;
97 state->decode_buffer_size = size;
98 state->decode_buffer_tail = buffer;
99 state->decode_buffer_head = buffer;
120 state->routine = *routine;
142 state->private_data = private_data;
146 if (!
state->errormsg_buf)
152 state->errormsg_buf[0] =
'\0';
172 if (
state->seg.ws_file != -1)
175 if (
state->decode_buffer &&
state->free_decode_buffer)
179 if (
state->readRecordBuf)
202 uint32 newSize = reclength;
204 newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
205 newSize =
Max(newSize, 5 *
Max(BLCKSZ, XLOG_BLCKSZ));
207 if (
state->readRecordBuf)
209 state->readRecordBuf =
211 if (
state->readRecordBuf == NULL)
213 state->readRecordBufSize = 0;
216 state->readRecordBufSize = newSize;
225 int segsize,
const char *waldir)
255 state->EndRecPtr = RecPtr;
256 state->NextRecPtr = RecPtr;
278 record =
state->record;
281 state->record = NULL;
285 if (
state->decode_queue_tail == record)
286 state->decode_queue_tail = NULL;
297 Assert(
state->decode_buffer_head == (
char *) record);
304 record = record->next;
305 while (
unlikely(record && record->oversized))
306 record = record->next;
311 state->decode_buffer_head = (
char *) record;
321 state->decode_buffer_head =
state->decode_buffer;
322 state->decode_buffer_tail =
state->decode_buffer;
347 if (
state->decode_queue_head == NULL)
350 if (
state->errormsg_deferred)
352 if (
state->errormsg_buf[0] !=
'\0')
353 *errormsg =
state->errormsg_buf;
354 state->errormsg_deferred =
false;
386 return state->record;
463 if (
state->decode_buffer_size == 0)
466 state->decode_buffer_head =
state->decode_buffer;
467 state->decode_buffer_tail =
state->decode_buffer;
468 state->free_decode_buffer =
true;
472 if (
state->decode_buffer_tail >=
state->decode_buffer_head)
475 if (
state->decode_buffer_tail + required_space <=
476 state->decode_buffer +
state->decode_buffer_size)
483 else if (
state->decode_buffer + required_space <
484 state->decode_buffer_head)
495 if (
state->decode_buffer_tail + required_space <
496 state->decode_buffer_head)
543 state->errormsg_buf[0] =
'\0';
549 RecPtr =
state->NextRecPtr;
576 state->nonblocking = nonblocking;
577 state->currRecPtr = RecPtr;
580 targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
581 targetRecOff = RecPtr % XLOG_BLCKSZ;
600 if (targetRecOff == 0)
605 RecPtr += pageHeaderSize;
606 targetRecOff = pageHeaderSize;
608 else if (targetRecOff < pageHeaderSize)
612 pageHeaderSize, targetRecOff);
617 targetRecOff == pageHeaderSize)
660 "invalid record length at %X/%X: expected at least %u, got %u",
677 if (decoded == NULL && nonblocking)
687 len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
702 Assert(
state->readRecordBufSize >= XLOG_BLCKSZ * 2);
706 memcpy(
state->readRecordBuf,
707 state->readBuf + RecPtr % XLOG_BLCKSZ,
len);
708 buffer =
state->readRecordBuf +
len;
714 targetPagePtr += XLOG_BLCKSZ;
740 state->overwrittenRecPtr = RecPtr;
741 RecPtr = targetPagePtr;
749 "there is no contrecord flag at %X/%X",
762 "invalid contrecord length %u (expected %lld) at %X/%X",
764 ((
long long) total_len) - gotlen,
778 contdata = (
char *)
state->readBuf + pageHeaderSize;
779 len = XLOG_BLCKSZ - pageHeaderSize;
785 pageHeaderSize +
len);
787 memcpy(buffer, (
char *) contdata,
len);
807 if (total_len >
state->readRecordBufSize)
809 char save_copy[XLOG_BLCKSZ * 2];
816 Assert(gotlen <= state->readRecordBufSize);
817 memcpy(save_copy,
state->readRecordBuf, gotlen);
825 memcpy(
state->readRecordBuf, save_copy, gotlen);
826 buffer =
state->readRecordBuf + gotlen;
828 }
while (gotlen < total_len);
836 state->DecodeRecPtr = RecPtr;
837 state->NextRecPtr = targetPagePtr + pageHeaderSize
844 Min(targetRecOff + total_len, XLOG_BLCKSZ));
856 state->DecodeRecPtr = RecPtr;
862 if (record->
xl_rmid == RM_XLOG_ID &&
866 state->NextRecPtr +=
state->segcxt.ws_segsize - 1;
887 "out of memory while trying to decode a record of length %u", total_len);
905 if ((
char *) decoded ==
state->decode_buffer)
908 state->decode_buffer_tail += decoded->
size;
913 if (
state->decode_queue_tail)
914 state->decode_queue_tail->
next = decoded;
915 state->decode_queue_tail = decoded;
916 if (!
state->decode_queue_head)
917 state->decode_queue_head = decoded;
933 state->abortedRecPtr = RecPtr;
934 state->missingContrecPtr = targetPagePtr;
941 state->errormsg_deferred =
true;
975 if (
state->errormsg_deferred)
982 return state->decode_queue_tail;
1012 Assert((pageptr % XLOG_BLCKSZ) == 0);
1018 if (targetSegNo ==
state->seg.ws_segno &&
1019 targetPageOff ==
state->segoff && reqLen <= state->
readLen)
1020 return state->readLen;
1041 if (targetSegNo !=
state->seg.ws_segno && targetPageOff != 0)
1043 XLogRecPtr targetSegmentPtr = pageptr - targetPageOff;
1102 state->seg.ws_segno = targetSegNo;
1103 state->segoff = targetPageOff;
1120 state->seg.ws_segno = 0;
1139 "invalid record length at %X/%X: expected at least %u, got %u",
1147 "invalid resource manager ID %u at %X/%X",
1157 if (!(record->
xl_prev < RecPtr))
1160 "record with incorrect prev-link %X/%X at %X/%X",
1173 if (record->
xl_prev != PrevRecPtr)
1176 "record with incorrect prev-link %X/%X at %X/%X",
1214 "incorrect resource manager data checksum in record at %X/%X",
1236 Assert((recptr % XLOG_BLCKSZ) == 0);
1248 "invalid magic number %04X in WAL segment %s, LSN %X/%X, offset %u",
1263 "invalid info bits %04X in WAL segment %s, LSN %X/%X, offset %u",
1275 if (
state->system_identifier &&
1279 "WAL file is from different database system: WAL file database system identifier is %llu, pg_control database system identifier is %llu",
1280 (
unsigned long long) longhdr->
xlp_sysid,
1281 (
unsigned long long)
state->system_identifier);
1287 "WAL file is from different database system: incorrect segment size in page header");
1293 "WAL file is from different database system: incorrect XLOG_BLCKSZ in page header");
1297 else if (offset == 0)
1305 "invalid info bits %04X in WAL segment %s, LSN %X/%X, offset %u",
1325 "unexpected pageaddr %X/%X in WAL segment %s, LSN %X/%X, offset %u",
1342 if (recptr >
state->latestPagePtr)
1351 "out-of-sequence timeline ID %u (after %u) in WAL segment %s, LSN %X/%X, offset %u",
1353 state->latestPageTLI,
1360 state->latestPagePtr = recptr;
1372 state->errormsg_buf[0] =
'\0';
1373 state->errormsg_deferred =
false;
1398 state->nonblocking =
false;
1421 targetRecOff = tmpRecPtr % XLOG_BLCKSZ;
1424 targetPagePtr = tmpRecPtr - targetRecOff;
1453 tmpRecPtr = targetPagePtr + XLOG_BLCKSZ;
1460 tmpRecPtr = targetPagePtr + pageHeaderSize
1467 tmpRecPtr = targetPagePtr + pageHeaderSize;
1481 if (RecPtr <= state->ReadRecPtr)
1484 found =
state->ReadRecPtr;
1536 if (
state->seg.ws_file < 0 ||
1538 tli !=
state->seg.ws_tli)
1542 if (
state->seg.ws_file >= 0)
1546 state->routine.segment_open(
state, nextSegNo, &tli);
1552 state->seg.ws_tli = tli;
1553 state->seg.ws_segno = nextSegNo;
1557 if (nbytes > (
state->segcxt.ws_segsize - startoff))
1558 segbytes =
state->segcxt.ws_segsize - startoff;
1568 readbytes =
pg_pread(
state->seg.ws_file, p, segbytes, (off_t) startoff);
1585 recptr += readbytes;
1586 nbytes -= readbytes;
1608 while ((r =
state->decode_queue_head) != NULL)
1614 state->decode_queue_tail = NULL;
1615 state->decode_queue_head = NULL;
1616 state->record = NULL;
1619 state->decode_buffer_tail =
state->decode_buffer;
1620 state->decode_buffer_head =
state->decode_buffer;
1623 state->errormsg_buf[0] =
'\0';
1624 state->errormsg_deferred =
false;
1648 size += (MAXIMUM_ALIGNOF - 1);
1652 size += (MAXIMUM_ALIGNOF - 1);
1679 #define COPY_HEADER_FIELD(_dst, _size) \
1681 if (remaining < _size) \
1682 goto shortdata_err; \
1683 memcpy(_dst, ptr, _size); \
1685 remaining -= _size; \
1695 decoded->
header = *record;
1697 decoded->
next = NULL;
1703 ptr = (
char *) record;
1716 uint8 main_data_len;
1721 datatotal += main_data_len;
1732 datatotal += main_data_len;
1754 if (block_id <= decoded->max_block_id)
1757 "out-of-order block_id %u at %X/%X",
1764 blk = &decoded->
blocks[block_id];
1770 blk->
flags = fork_flags;
1781 "BKPBLOCK_HAS_DATA set, but no data included at %X/%X",
1788 "BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X",
1824 "BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
1840 "BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
1854 "BKPIMAGE_COMPRESSED set, but block image length %u at %X/%X",
1869 "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_COMPRESSED set, but block image length is %u at %X/%X",
1882 if (rlocator == NULL)
1885 "BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
1897 "invalid block_id %u at %X/%X",
1913 out = ((
char *) decoded) +
1918 for (block_id = 0; block_id <= decoded->
max_block_id; block_id++)
1964 "record with invalid length at %X/%X",
1967 *errormsg =
state->errormsg_buf;
1987 elog(
ERROR,
"could not locate backup block with ID %d in WAL record",
1990 pg_fatal(
"could not locate backup block with ID %d in WAL record",
2021 *blknum = bkpb->
blkno;
2022 if (prefetch_buffer)
2074 "could not restore image at %X/%X with invalid block %d specified",
2093 bool decomp_success =
true;
2099 decomp_success =
false;
2104 if (LZ4_decompress_safe(ptr, tmp.
data,
2106 decomp_success =
false;
2108 report_invalid_record(record,
"could not restore image at %X/%X compressed with %s not supported by build, block %d",
2118 size_t decomp_result = ZSTD_decompress(tmp.
data,
2122 if (ZSTD_isError(decomp_result))
2123 decomp_success =
false;
2125 report_invalid_record(record,
"could not restore image at %X/%X compressed with %s not supported by build, block %d",
2134 report_invalid_record(record,
"could not restore image at %X/%X compressed with unknown method, block %d",
2140 if (!decomp_success)
2154 memcpy(page, ptr, BLCKSZ);
#define pg_attribute_printf(f, a)
#define MemSet(start, val, len)
elog(ERROR, "%s: %s", p2, msg)
void err(int eval, const char *fmt,...)
#define MCXT_ALLOC_NO_OOM
if(TABLE==NULL||TABLE_index==NULL)
static void const char * fmt
Assert(fmt[strlen(fmt) - 1] !='\n')
void pfree(void *pointer)
void * palloc_extended(Size size, int flags)
#define AmStartupProcess()
#define InvalidRepOriginId
#define COMP_CRC32C(crc, data, len)
#define EQ_CRC32C(c1, c2)
int32 pglz_decompress(const char *source, int32 slen, char *dest, int32 rawsize, bool check_complete)
#define RmgrIdIsValid(rmid)
struct DecodedXLogRecord * next
TransactionId toplevel_xid
RepOriginId record_origin
DecodedBkpBlock blocks[FLEXIBLE_ARRAY_MEMBER]
FullTransactionId nextXid
DecodedXLogRecord * record
#define InvalidTransactionId
#define EpochFromFullTransactionId(x)
#define XidFromFullTransactionId(x)
static FullTransactionId FullTransactionIdFromEpochAndXid(uint32 epoch, TransactionId xid)
VariableCache ShmemVariableCache
static void pgstat_report_wait_start(uint32 wait_event_info)
static void pgstat_report_wait_end(void)
static const unsigned __int64 epoch
#define XLP_FIRST_IS_CONTRECORD
XLogLongPageHeaderData * XLogLongPageHeader
#define XLP_FIRST_IS_OVERWRITE_CONTRECORD
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
XLogPageHeaderData * XLogPageHeader
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define XRecOffIsValid(xlrp)
#define SizeOfXLogShortPHD
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
#define XLogPageHeaderSize(hdr)
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr
static void static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength)
bool XLogRecGetBlockTagExtended(XLogReaderState *record, uint8 block_id, RelFileLocator *rlocator, ForkNumber *forknum, BlockNumber *blknum, Buffer *prefetch_buffer)
static XLogPageReadResult XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
void XLogReaderSetDecodeBuffer(XLogReaderState *state, void *buffer, size_t size)
static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, int segsize, const char *waldir)
static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
static void report_invalid_record(XLogReaderState *state, const char *fmt,...) pg_attribute_printf(2
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
DecodedXLogRecord * XLogNextRecord(XLogReaderState *state, char **errormsg)
void XLogReaderResetError(XLogReaderState *state)
static void XLogReaderInvalReadState(XLogReaderState *state)
#define COPY_HEADER_FIELD(_dst, _size)
bool XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, char *phdr)
FullTransactionId XLogRecGetFullXid(XLogReaderState *record)
void XLogReaderFree(XLogReaderState *state)
void XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id, RelFileLocator *rlocator, ForkNumber *forknum, BlockNumber *blknum)
DecodedXLogRecord * XLogReadAhead(XLogReaderState *state, bool nonblocking)
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
static void ResetDecoder(XLogReaderState *state)
bool DecodeXLogRecord(XLogReaderState *state, DecodedXLogRecord *decoded, XLogRecord *record, XLogRecPtr lsn, char **errormsg)
static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr)
char * XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len)
#define DEFAULT_DECODE_BUFFER_SIZE
size_t DecodeXLogRecordRequiredSpace(size_t xl_tot_len)
XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
bool RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
static DecodedXLogRecord * XLogReadRecordAlloc(XLogReaderState *state, size_t xl_tot_len, bool allow_oversized)
XLogRecPtr XLogReleasePreviousRecord(XLogReaderState *state)
static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess)
static bool XLogReaderHasQueuedRecordOrError(XLogReaderState *state)
#define XLogRecGetXid(decoder)
#define XLogRecHasBlockRef(decoder, block_id)
#define BKPIMAGE_COMPRESS_ZSTD
#define BKPBLOCK_FORK_MASK
#define BKPBLOCK_HAS_DATA
#define BKPIMAGE_HAS_HOLE
#define XLR_BLOCK_ID_DATA_LONG
#define BKPIMAGE_COMPRESS_LZ4
#define BKPIMAGE_COMPRESSED(info)
#define XLR_BLOCK_ID_TOPLEVEL_XID
#define XLR_BLOCK_ID_DATA_SHORT
#define BKPBLOCK_SAME_REL
#define BKPIMAGE_COMPRESS_PGLZ
#define XLR_BLOCK_ID_ORIGIN
#define BKPBLOCK_HAS_IMAGE