PostgreSQL Source Code  git master
xlogreader.c File Reference
#include "postgres.h"
#include <unistd.h>
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
#include "access/xlogrecord.h"
#include "catalog/pg_control.h"
#include "common/pg_lzcompress.h"
#include "replication/origin.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "utils/memutils.h"
Include dependency graph for xlogreader.c:

Go to the source code of this file.

Macros

#define MAX_ERRORMSG_LEN   1000
 
#define DEFAULT_DECODE_BUFFER_SIZE   (64 * 1024)
 
#define COPY_HEADER_FIELD(_dst, _size)
 

Functions

static void report_invalid_record (XLogReaderState *state, const char *fmt,...) pg_attribute_printf(2
 
static void static bool allocate_recordbuf (XLogReaderState *state, uint32 reclength)
 
static int ReadPageInternal (XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 
static void XLogReaderInvalReadState (XLogReaderState *state)
 
static XLogPageReadResult XLogDecodeNextRecord (XLogReaderState *state, bool non_blocking)
 
static bool ValidXLogRecordHeader (XLogReaderState *state, XLogRecPtr RecPtr, XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess)
 
static bool ValidXLogRecord (XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr)
 
static void ResetDecoder (XLogReaderState *state)
 
static void WALOpenSegmentInit (WALOpenSegment *seg, WALSegmentContext *segcxt, int segsize, const char *waldir)
 
void XLogReaderSetDecodeBuffer (XLogReaderState *state, void *buffer, size_t size)
 
XLogReaderStateXLogReaderAllocate (int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
 
void XLogReaderFree (XLogReaderState *state)
 
void XLogBeginRead (XLogReaderState *state, XLogRecPtr RecPtr)
 
void XLogReleasePreviousRecord (XLogReaderState *state)
 
DecodedXLogRecordXLogNextRecord (XLogReaderState *state, char **errormsg)
 
XLogRecordXLogReadRecord (XLogReaderState *state, char **errormsg)
 
static DecodedXLogRecordXLogReadRecordAlloc (XLogReaderState *state, size_t xl_tot_len, bool allow_oversized)
 
DecodedXLogRecordXLogReadAhead (XLogReaderState *state, bool nonblocking)
 
bool XLogReaderValidatePageHeader (XLogReaderState *state, XLogRecPtr recptr, char *phdr)
 
XLogRecPtr XLogFindNextRecord (XLogReaderState *state, XLogRecPtr RecPtr)
 
bool WALRead (XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
 
size_t DecodeXLogRecordRequiredSpace (size_t xl_tot_len)
 
bool DecodeXLogRecord (XLogReaderState *state, DecodedXLogRecord *decoded, XLogRecord *record, XLogRecPtr lsn, char **errormsg)
 
void XLogRecGetBlockTag (XLogReaderState *record, uint8 block_id, RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
 
bool XLogRecGetBlockTagExtended (XLogReaderState *record, uint8 block_id, RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum, Buffer *prefetch_buffer)
 
char * XLogRecGetBlockData (XLogReaderState *record, uint8 block_id, Size *len)
 
bool RestoreBlockImage (XLogReaderState *record, uint8 block_id, char *page)
 
FullTransactionId XLogRecGetFullXid (XLogReaderState *record)
 

Macro Definition Documentation

◆ COPY_HEADER_FIELD

#define COPY_HEADER_FIELD (   _dst,
  _size 
)
Value:
do { \
if (remaining < _size) \
goto shortdata_err; \
memcpy(_dst, ptr, _size); \
ptr += _size; \
remaining -= _size; \
} while(0)
int remaining
Definition: informix.c:667

◆ DEFAULT_DECODE_BUFFER_SIZE

#define DEFAULT_DECODE_BUFFER_SIZE   (64 * 1024)

Definition at line 66 of file xlogreader.c.

◆ MAX_ERRORMSG_LEN

#define MAX_ERRORMSG_LEN   1000

Definition at line 60 of file xlogreader.c.

Function Documentation

◆ allocate_recordbuf()

static bool allocate_recordbuf ( XLogReaderState state,
uint32  reclength 
)
static

Definition at line 197 of file xlogreader.c.

198 {
199  uint32 newSize = reclength;
200 
201  newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
202  newSize = Max(newSize, 5 * Max(BLCKSZ, XLOG_BLCKSZ));
203 
204 #ifndef FRONTEND
205 
206  /*
207  * Note that in much unlucky circumstances, the random data read from a
208  * recycled segment can cause this routine to be called with a size
209  * causing a hard failure at allocation. For a standby, this would cause
210  * the instance to stop suddenly with a hard failure, preventing it to
211  * retry fetching WAL from one of its sources which could allow it to move
212  * on with replay without a manual restart. If the data comes from a past
213  * recycled segment and is still valid, then the allocation may succeed
214  * but record checks are going to fail so this would be short-lived. If
215  * the allocation fails because of a memory shortage, then this is not a
216  * hard failure either per the guarantee given by MCXT_ALLOC_NO_OOM.
217  */
218  if (!AllocSizeIsValid(newSize))
219  return false;
220 
221 #endif
222 
223  if (state->readRecordBuf)
224  pfree(state->readRecordBuf);
225  state->readRecordBuf =
226  (char *) palloc_extended(newSize, MCXT_ALLOC_NO_OOM);
227  if (state->readRecordBuf == NULL)
228  {
229  state->readRecordBufSize = 0;
230  return false;
231  }
232  state->readRecordBufSize = newSize;
233  return true;
234 }
unsigned int uint32
Definition: c.h:441
#define Max(x, y)
Definition: c.h:980
#define MCXT_ALLOC_NO_OOM
Definition: fe_memutils.h:17
void pfree(void *pointer)
Definition: mcxt.c:1175
void * palloc_extended(Size size, int flags)
Definition: mcxt.c:1132
#define AllocSizeIsValid(size)
Definition: memutils.h:42
Definition: regguts.h:318

References AllocSizeIsValid, Max, MCXT_ALLOC_NO_OOM, palloc_extended(), and pfree().

Referenced by XLogDecodeNextRecord(), and XLogReaderAllocate().

◆ DecodeXLogRecord()

bool DecodeXLogRecord ( XLogReaderState state,
DecodedXLogRecord decoded,
XLogRecord record,
XLogRecPtr  lsn,
char **  errormsg 
)

Definition at line 1619 of file xlogreader.c.

1624 {
1625  /*
1626  * read next _size bytes from record buffer, but check for overrun first.
1627  */
1628 #define COPY_HEADER_FIELD(_dst, _size) \
1629  do { \
1630  if (remaining < _size) \
1631  goto shortdata_err; \
1632  memcpy(_dst, ptr, _size); \
1633  ptr += _size; \
1634  remaining -= _size; \
1635  } while(0)
1636 
1637  char *ptr;
1638  char *out;
1639  uint32 remaining;
1640  uint32 datatotal;
1641  RelFileNode *rnode = NULL;
1642  uint8 block_id;
1643 
1644  decoded->header = *record;
1645  decoded->lsn = lsn;
1646  decoded->next = NULL;
1647  decoded->record_origin = InvalidRepOriginId;
1649  decoded->main_data = NULL;
1650  decoded->main_data_len = 0;
1651  decoded->max_block_id = -1;
1652  ptr = (char *) record;
1653  ptr += SizeOfXLogRecord;
1655 
1656  /* Decode the headers */
1657  datatotal = 0;
1658  while (remaining > datatotal)
1659  {
1660  COPY_HEADER_FIELD(&block_id, sizeof(uint8));
1661 
1662  if (block_id == XLR_BLOCK_ID_DATA_SHORT)
1663  {
1664  /* XLogRecordDataHeaderShort */
1665  uint8 main_data_len;
1666 
1667  COPY_HEADER_FIELD(&main_data_len, sizeof(uint8));
1668 
1669  decoded->main_data_len = main_data_len;
1670  datatotal += main_data_len;
1671  break; /* by convention, the main data fragment is
1672  * always last */
1673  }
1674  else if (block_id == XLR_BLOCK_ID_DATA_LONG)
1675  {
1676  /* XLogRecordDataHeaderLong */
1677  uint32 main_data_len;
1678 
1679  COPY_HEADER_FIELD(&main_data_len, sizeof(uint32));
1680  decoded->main_data_len = main_data_len;
1681  datatotal += main_data_len;
1682  break; /* by convention, the main data fragment is
1683  * always last */
1684  }
1685  else if (block_id == XLR_BLOCK_ID_ORIGIN)
1686  {
1687  COPY_HEADER_FIELD(&decoded->record_origin, sizeof(RepOriginId));
1688  }
1689  else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID)
1690  {
1691  COPY_HEADER_FIELD(&decoded->toplevel_xid, sizeof(TransactionId));
1692  }
1693  else if (block_id <= XLR_MAX_BLOCK_ID)
1694  {
1695  /* XLogRecordBlockHeader */
1696  DecodedBkpBlock *blk;
1697  uint8 fork_flags;
1698 
1699  /* mark any intervening block IDs as not in use */
1700  for (int i = decoded->max_block_id + 1; i < block_id; ++i)
1701  decoded->blocks[i].in_use = false;
1702 
1703  if (block_id <= decoded->max_block_id)
1704  {
1706  "out-of-order block_id %u at %X/%X",
1707  block_id,
1708  LSN_FORMAT_ARGS(state->ReadRecPtr));
1709  goto err;
1710  }
1711  decoded->max_block_id = block_id;
1712 
1713  blk = &decoded->blocks[block_id];
1714  blk->in_use = true;
1715  blk->apply_image = false;
1716 
1717  COPY_HEADER_FIELD(&fork_flags, sizeof(uint8));
1718  blk->forknum = fork_flags & BKPBLOCK_FORK_MASK;
1719  blk->flags = fork_flags;
1720  blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0);
1721  blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0);
1722 
1724 
1725  COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16));
1726  /* cross-check that the HAS_DATA flag is set iff data_length > 0 */
1727  if (blk->has_data && blk->data_len == 0)
1728  {
1730  "BKPBLOCK_HAS_DATA set, but no data included at %X/%X",
1731  LSN_FORMAT_ARGS(state->ReadRecPtr));
1732  goto err;
1733  }
1734  if (!blk->has_data && blk->data_len != 0)
1735  {
1737  "BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X",
1738  (unsigned int) blk->data_len,
1739  LSN_FORMAT_ARGS(state->ReadRecPtr));
1740  goto err;
1741  }
1742  datatotal += blk->data_len;
1743 
1744  if (blk->has_image)
1745  {
1746  COPY_HEADER_FIELD(&blk->bimg_len, sizeof(uint16));
1747  COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16));
1748  COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8));
1749 
1750  blk->apply_image = ((blk->bimg_info & BKPIMAGE_APPLY) != 0);
1751 
1752  if (BKPIMAGE_COMPRESSED(blk->bimg_info))
1753  {
1754  if (blk->bimg_info & BKPIMAGE_HAS_HOLE)
1755  COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16));
1756  else
1757  blk->hole_length = 0;
1758  }
1759  else
1760  blk->hole_length = BLCKSZ - blk->bimg_len;
1761  datatotal += blk->bimg_len;
1762 
1763  /*
1764  * cross-check that hole_offset > 0, hole_length > 0 and
1765  * bimg_len < BLCKSZ if the HAS_HOLE flag is set.
1766  */
1767  if ((blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1768  (blk->hole_offset == 0 ||
1769  blk->hole_length == 0 ||
1770  blk->bimg_len == BLCKSZ))
1771  {
1773  "BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
1774  (unsigned int) blk->hole_offset,
1775  (unsigned int) blk->hole_length,
1776  (unsigned int) blk->bimg_len,
1777  LSN_FORMAT_ARGS(state->ReadRecPtr));
1778  goto err;
1779  }
1780 
1781  /*
1782  * cross-check that hole_offset == 0 and hole_length == 0 if
1783  * the HAS_HOLE flag is not set.
1784  */
1785  if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1786  (blk->hole_offset != 0 || blk->hole_length != 0))
1787  {
1789  "BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
1790  (unsigned int) blk->hole_offset,
1791  (unsigned int) blk->hole_length,
1792  LSN_FORMAT_ARGS(state->ReadRecPtr));
1793  goto err;
1794  }
1795 
1796  /*
1797  * Cross-check that bimg_len < BLCKSZ if it is compressed.
1798  */
1799  if (BKPIMAGE_COMPRESSED(blk->bimg_info) &&
1800  blk->bimg_len == BLCKSZ)
1801  {
1803  "BKPIMAGE_COMPRESSED set, but block image length %u at %X/%X",
1804  (unsigned int) blk->bimg_len,
1805  LSN_FORMAT_ARGS(state->ReadRecPtr));
1806  goto err;
1807  }
1808 
1809  /*
1810  * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE is
1811  * set nor COMPRESSED().
1812  */
1813  if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1814  !BKPIMAGE_COMPRESSED(blk->bimg_info) &&
1815  blk->bimg_len != BLCKSZ)
1816  {
1818  "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_COMPRESSED set, but block image length is %u at %X/%X",
1819  (unsigned int) blk->data_len,
1820  LSN_FORMAT_ARGS(state->ReadRecPtr));
1821  goto err;
1822  }
1823  }
1824  if (!(fork_flags & BKPBLOCK_SAME_REL))
1825  {
1826  COPY_HEADER_FIELD(&blk->rnode, sizeof(RelFileNode));
1827  rnode = &blk->rnode;
1828  }
1829  else
1830  {
1831  if (rnode == NULL)
1832  {
1834  "BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
1835  LSN_FORMAT_ARGS(state->ReadRecPtr));
1836  goto err;
1837  }
1838 
1839  blk->rnode = *rnode;
1840  }
1841  COPY_HEADER_FIELD(&blk->blkno, sizeof(BlockNumber));
1842  }
1843  else
1844  {
1846  "invalid block_id %u at %X/%X",
1847  block_id, LSN_FORMAT_ARGS(state->ReadRecPtr));
1848  goto err;
1849  }
1850  }
1851 
1852  if (remaining != datatotal)
1853  goto shortdata_err;
1854 
1855  /*
1856  * Ok, we've parsed the fragment headers, and verified that the total
1857  * length of the payload in the fragments is equal to the amount of data
1858  * left. Copy the data of each fragment to contiguous space after the
1859  * blocks array, inserting alignment padding before the data fragments so
1860  * they can be cast to struct pointers by REDO routines.
1861  */
1862  out = ((char *) decoded) +
1863  offsetof(DecodedXLogRecord, blocks) +
1864  sizeof(decoded->blocks[0]) * (decoded->max_block_id + 1);
1865 
1866  /* block data first */
1867  for (block_id = 0; block_id <= decoded->max_block_id; block_id++)
1868  {
1869  DecodedBkpBlock *blk = &decoded->blocks[block_id];
1870 
1871  if (!blk->in_use)
1872  continue;
1873 
1874  Assert(blk->has_image || !blk->apply_image);
1875 
1876  if (blk->has_image)
1877  {
1878  /* no need to align image */
1879  blk->bkp_image = out;
1880  memcpy(out, ptr, blk->bimg_len);
1881  ptr += blk->bimg_len;
1882  out += blk->bimg_len;
1883  }
1884  if (blk->has_data)
1885  {
1886  out = (char *) MAXALIGN(out);
1887  blk->data = out;
1888  memcpy(blk->data, ptr, blk->data_len);
1889  ptr += blk->data_len;
1890  out += blk->data_len;
1891  }
1892  }
1893 
1894  /* and finally, the main data */
1895  if (decoded->main_data_len > 0)
1896  {
1897  out = (char *) MAXALIGN(out);
1898  decoded->main_data = out;
1899  memcpy(decoded->main_data, ptr, decoded->main_data_len);
1900  ptr += decoded->main_data_len;
1901  out += decoded->main_data_len;
1902  }
1903 
1904  /* Report the actual size we used. */
1905  decoded->size = MAXALIGN(out - (char *) decoded);
1907  decoded->size);
1908 
1909  return true;
1910 
1911 shortdata_err:
1913  "record with invalid length at %X/%X",
1914  LSN_FORMAT_ARGS(state->ReadRecPtr));
1915 err:
1916  *errormsg = state->errormsg_buf;
1917 
1918  return false;
1919 }
uint32 BlockNumber
Definition: block.h:31
#define InvalidBuffer
Definition: buf.h:25
unsigned short uint16
Definition: c.h:440
#define MAXALIGN(LEN)
Definition: c.h:757
#define offsetof(type, field)
Definition: c.h:727
unsigned char uint8
Definition: c.h:439
uint32 TransactionId
Definition: c.h:587
int i
Definition: isn.c:73
Assert(fmt[strlen(fmt) - 1] !='\n')
#define InvalidRepOriginId
Definition: origin.h:33
uint16 hole_length
Definition: xlogreader.h:140
RelFileNode rnode
Definition: xlogreader.h:125
char * bkp_image
Definition: xlogreader.h:138
Buffer prefetch_buffer
Definition: xlogreader.h:130
BlockNumber blkno
Definition: xlogreader.h:127
ForkNumber forknum
Definition: xlogreader.h:126
uint16 hole_offset
Definition: xlogreader.h:139
XLogRecord header
Definition: xlogreader.h:166
struct DecodedXLogRecord * next
Definition: xlogreader.h:161
TransactionId toplevel_xid
Definition: xlogreader.h:168
uint32 main_data_len
Definition: xlogreader.h:170
RepOriginId record_origin
Definition: xlogreader.h:167
DecodedBkpBlock blocks[FLEXIBLE_ARRAY_MEMBER]
Definition: xlogreader.h:172
XLogRecPtr lsn
Definition: xlogreader.h:164
uint32 xl_tot_len
Definition: xlogrecord.h:43
#define InvalidTransactionId
Definition: transam.h:31
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint16 RepOriginId
Definition: xlogdefs.h:65
static void report_invalid_record(XLogReaderState *state, const char *fmt,...) pg_attribute_printf(2
Definition: xlogreader.c:73
#define COPY_HEADER_FIELD(_dst, _size)
size_t DecodeXLogRecordRequiredSpace(size_t xl_tot_len)
Definition: xlogreader.c:1586
#define BKPBLOCK_FORK_MASK
Definition: xlogrecord.h:184
#define BKPBLOCK_HAS_DATA
Definition: xlogrecord.h:187
#define BKPIMAGE_APPLY
Definition: xlogrecord.h:147
#define BKPIMAGE_HAS_HOLE
Definition: xlogrecord.h:146
#define XLR_BLOCK_ID_DATA_LONG
Definition: xlogrecord.h:231
#define BKPIMAGE_COMPRESSED(info)
Definition: xlogrecord.h:153
#define XLR_BLOCK_ID_TOPLEVEL_XID
Definition: xlogrecord.h:233
#define XLR_BLOCK_ID_DATA_SHORT
Definition: xlogrecord.h:230
#define XLR_MAX_BLOCK_ID
Definition: xlogrecord.h:228
#define BKPBLOCK_SAME_REL
Definition: xlogrecord.h:189
#define XLR_BLOCK_ID_ORIGIN
Definition: xlogrecord.h:232
#define SizeOfXLogRecord
Definition: xlogrecord.h:55
#define BKPBLOCK_HAS_IMAGE
Definition: xlogrecord.h:186

References DecodedBkpBlock::apply_image, Assert(), DecodedBkpBlock::bimg_info, DecodedBkpBlock::bimg_len, DecodedBkpBlock::bkp_image, BKPBLOCK_FORK_MASK, BKPBLOCK_HAS_DATA, BKPBLOCK_HAS_IMAGE, BKPBLOCK_SAME_REL, BKPIMAGE_APPLY, BKPIMAGE_COMPRESSED, BKPIMAGE_HAS_HOLE, DecodedBkpBlock::blkno, DecodedXLogRecord::blocks, COPY_HEADER_FIELD, DecodedBkpBlock::data, DecodedBkpBlock::data_len, DecodeXLogRecordRequiredSpace(), DecodedBkpBlock::flags, DecodedBkpBlock::forknum, DecodedBkpBlock::has_data, DecodedBkpBlock::has_image, DecodedXLogRecord::header, DecodedBkpBlock::hole_length, DecodedBkpBlock::hole_offset, i, DecodedBkpBlock::in_use, InvalidBuffer, InvalidRepOriginId, InvalidTransactionId, DecodedXLogRecord::lsn, LSN_FORMAT_ARGS, DecodedXLogRecord::main_data, DecodedXLogRecord::main_data_len, DecodedXLogRecord::max_block_id, MAXALIGN, DecodedXLogRecord::next, offsetof, DecodedBkpBlock::prefetch_buffer, DecodedXLogRecord::record_origin, remaining, report_invalid_record(), DecodedBkpBlock::rnode, DecodedXLogRecord::size, SizeOfXLogRecord, DecodedXLogRecord::toplevel_xid, XLogRecord::xl_tot_len, XLR_BLOCK_ID_DATA_LONG, XLR_BLOCK_ID_DATA_SHORT, XLR_BLOCK_ID_ORIGIN, XLR_BLOCK_ID_TOPLEVEL_XID, and XLR_MAX_BLOCK_ID.

Referenced by XLogDecodeNextRecord(), and XLogInsertRecord().

◆ DecodeXLogRecordRequiredSpace()

size_t DecodeXLogRecordRequiredSpace ( size_t  xl_tot_len)

Definition at line 1586 of file xlogreader.c.

1587 {
1588  size_t size = 0;
1589 
1590  /* Account for the fixed size part of the decoded record struct. */
1591  size += offsetof(DecodedXLogRecord, blocks[0]);
1592  /* Account for the flexible blocks array of maximum possible size. */
1593  size += sizeof(DecodedBkpBlock) * (XLR_MAX_BLOCK_ID + 1);
1594  /* Account for all the raw main and block data. */
1595  size += xl_tot_len;
1596  /* We might insert padding before main_data. */
1597  size += (MAXIMUM_ALIGNOF - 1);
1598  /* We might insert padding before each block's data. */
1599  size += (MAXIMUM_ALIGNOF - 1) * (XLR_MAX_BLOCK_ID + 1);
1600  /* We might insert padding at the end. */
1601  size += (MAXIMUM_ALIGNOF - 1);
1602 
1603  return size;
1604 }

References offsetof, and XLR_MAX_BLOCK_ID.

Referenced by DecodeXLogRecord(), XLogInsertRecord(), and XLogReadRecordAlloc().

◆ ReadPageInternal()

static int ReadPageInternal ( XLogReaderState state,
XLogRecPtr  pageptr,
int  reqLen 
)
static

Definition at line 972 of file xlogreader.c.

973 {
974  int readLen;
975  uint32 targetPageOff;
976  XLogSegNo targetSegNo;
977  XLogPageHeader hdr;
978 
979  Assert((pageptr % XLOG_BLCKSZ) == 0);
980 
981  XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize);
982  targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize);
983 
984  /* check whether we have all the requested data already */
985  if (targetSegNo == state->seg.ws_segno &&
986  targetPageOff == state->segoff && reqLen <= state->readLen)
987  return state->readLen;
988 
989  /*
990  * Data is not in our buffer.
991  *
992  * Every time we actually read the segment, even if we looked at parts of
993  * it before, we need to do verification as the page_read callback might
994  * now be rereading data from a different source.
995  *
996  * Whenever switching to a new WAL segment, we read the first page of the
997  * file and validate its header, even if that's not where the target
998  * record is. This is so that we can check the additional identification
999  * info that is present in the first page's "long" header.
1000  */
1001  if (targetSegNo != state->seg.ws_segno && targetPageOff != 0)
1002  {
1003  XLogRecPtr targetSegmentPtr = pageptr - targetPageOff;
1004 
1005  readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ,
1006  state->currRecPtr,
1007  state->readBuf);
1008  if (readLen == XLREAD_WOULDBLOCK)
1009  return XLREAD_WOULDBLOCK;
1010  else if (readLen < 0)
1011  goto err;
1012 
1013  /* we can be sure to have enough WAL available, we scrolled back */
1014  Assert(readLen == XLOG_BLCKSZ);
1015 
1016  if (!XLogReaderValidatePageHeader(state, targetSegmentPtr,
1017  state->readBuf))
1018  goto err;
1019  }
1020 
1021  /*
1022  * First, read the requested data length, but at least a short page header
1023  * so that we can validate it.
1024  */
1025  readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
1026  state->currRecPtr,
1027  state->readBuf);
1028  if (readLen == XLREAD_WOULDBLOCK)
1029  return XLREAD_WOULDBLOCK;
1030  else if (readLen < 0)
1031  goto err;
1032 
1033  Assert(readLen <= XLOG_BLCKSZ);
1034 
1035  /* Do we have enough data to check the header length? */
1036  if (readLen <= SizeOfXLogShortPHD)
1037  goto err;
1038 
1039  Assert(readLen >= reqLen);
1040 
1041  hdr = (XLogPageHeader) state->readBuf;
1042 
1043  /* still not enough */
1044  if (readLen < XLogPageHeaderSize(hdr))
1045  {
1046  readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr),
1047  state->currRecPtr,
1048  state->readBuf);
1049  if (readLen == XLREAD_WOULDBLOCK)
1050  return XLREAD_WOULDBLOCK;
1051  else if (readLen < 0)
1052  goto err;
1053  }
1054 
1055  /*
1056  * Now that we know we have the full header, validate it.
1057  */
1058  if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr))
1059  goto err;
1060 
1061  /* update read state information */
1062  state->seg.ws_segno = targetSegNo;
1063  state->segoff = targetPageOff;
1064  state->readLen = readLen;
1065 
1066  return readLen;
1067 
1068 err:
1069  if (state->errormsg_buf[0] != '\0')
1070  {
1071  state->errormsg_deferred = true;
1073  }
1074  return XLREAD_FAIL;
1075 }
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
XLogPageHeaderData * XLogPageHeader
Definition: xlog_internal.h:54
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define SizeOfXLogShortPHD
Definition: xlog_internal.h:52
#define XLogPageHeaderSize(hdr)
Definition: xlog_internal.h:84
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint64 XLogSegNo
Definition: xlogdefs.h:48
static void XLogReaderInvalReadState(XLogReaderState *state)
Definition: xlogreader.c:1081
bool XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, char *phdr)
Definition: xlogreader.c:1190
@ XLREAD_WOULDBLOCK
Definition: xlogreader.h:354
@ XLREAD_FAIL
Definition: xlogreader.h:353
static uint32 readLen
Definition: xlogrecovery.c:230

References Assert(), if(), Max, readLen, SizeOfXLogShortPHD, XLByteToSeg, XLogPageHeaderSize, XLogReaderInvalReadState(), XLogReaderValidatePageHeader(), XLogSegmentOffset, XLREAD_FAIL, and XLREAD_WOULDBLOCK.

Referenced by XLogDecodeNextRecord(), and XLogFindNextRecord().

◆ report_invalid_record()

static void report_invalid_record ( XLogReaderState state,
const char *  fmt,
  ... 
)
static

Definition at line 73 of file xlogreader.c.

74 {
75  va_list args;
76 
77  fmt = _(fmt);
78 
79  va_start(args, fmt);
80  vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args);
81  va_end(args);
82 
83  state->errormsg_deferred = true;
84 }
#define _(x)
Definition: elog.c:89
va_end(args)
static void const char * fmt
va_start(args, fmt)
#define vsnprintf
Definition: port.h:224
#define MAX_ERRORMSG_LEN
Definition: xlogreader.c:60

References _, generate_unaccent_rules::args, fmt, MAX_ERRORMSG_LEN, va_end(), va_start(), and vsnprintf.

Referenced by DecodeXLogRecord(), RestoreBlockImage(), ValidXLogRecord(), ValidXLogRecordHeader(), XLogDecodeNextRecord(), and XLogReaderValidatePageHeader().

◆ ResetDecoder()

static void ResetDecoder ( XLogReaderState state)
static

Definition at line 1552 of file xlogreader.c.

1553 {
1554  DecodedXLogRecord *r;
1555 
1556  /* Reset the decoded record queue, freeing any oversized records. */
1557  while ((r = state->decode_queue_head) != NULL)
1558  {
1559  state->decode_queue_head = r->next;
1560  if (r->oversized)
1561  pfree(r);
1562  }
1563  state->decode_queue_tail = NULL;
1564  state->decode_queue_head = NULL;
1565  state->record = NULL;
1566 
1567  /* Reset the decode buffer to empty. */
1568  state->decode_buffer_tail = state->decode_buffer;
1569  state->decode_buffer_head = state->decode_buffer;
1570 
1571  /* Clear error state. */
1572  state->errormsg_buf[0] = '\0';
1573  state->errormsg_deferred = false;
1574 }

References DecodedXLogRecord::next, DecodedXLogRecord::oversized, and pfree().

Referenced by XLogBeginRead().

◆ RestoreBlockImage()

bool RestoreBlockImage ( XLogReaderState record,
uint8  block_id,
char *  page 
)

Definition at line 2011 of file xlogreader.c.

2012 {
2013  DecodedBkpBlock *bkpb;
2014  char *ptr;
2015  PGAlignedBlock tmp;
2016 
2017  if (block_id > record->record->max_block_id ||
2018  !record->record->blocks[block_id].in_use)
2019  return false;
2020  if (!record->record->blocks[block_id].has_image)
2021  return false;
2022 
2023  bkpb = &record->record->blocks[block_id];
2024  ptr = bkpb->bkp_image;
2025 
2026  if (BKPIMAGE_COMPRESSED(bkpb->bimg_info))
2027  {
2028  /* If a backup block image is compressed, decompress it */
2029  bool decomp_success = true;
2030 
2031  if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_PGLZ) != 0)
2032  {
2033  if (pglz_decompress(ptr, bkpb->bimg_len, tmp.data,
2034  BLCKSZ - bkpb->hole_length, true) < 0)
2035  decomp_success = false;
2036  }
2037  else if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0)
2038  {
2039 #ifdef USE_LZ4
2040  if (LZ4_decompress_safe(ptr, tmp.data,
2041  bkpb->bimg_len, BLCKSZ - bkpb->hole_length) <= 0)
2042  decomp_success = false;
2043 #else
2044  report_invalid_record(record, "image at %X/%X compressed with %s not supported by build, block %d",
2045  LSN_FORMAT_ARGS(record->ReadRecPtr),
2046  "LZ4",
2047  block_id);
2048  return false;
2049 #endif
2050  }
2051  else if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_ZSTD) != 0)
2052  {
2053 #ifdef USE_ZSTD
2054  size_t decomp_result = ZSTD_decompress(tmp.data,
2055  BLCKSZ - bkpb->hole_length,
2056  ptr, bkpb->bimg_len);
2057 
2058  if (ZSTD_isError(decomp_result))
2059  decomp_success = false;
2060 #else
2061  report_invalid_record(record, "image at %X/%X compressed with %s not supported by build, block %d",
2062  LSN_FORMAT_ARGS(record->ReadRecPtr),
2063  "zstd",
2064  block_id);
2065  return false;
2066 #endif
2067  }
2068  else
2069  {
2070  report_invalid_record(record, "image at %X/%X compressed with unknown method, block %d",
2071  LSN_FORMAT_ARGS(record->ReadRecPtr),
2072  block_id);
2073  return false;
2074  }
2075 
2076  if (!decomp_success)
2077  {
2078  report_invalid_record(record, "invalid compressed image at %X/%X, block %d",
2079  LSN_FORMAT_ARGS(record->ReadRecPtr),
2080  block_id);
2081  return false;
2082  }
2083 
2084  ptr = tmp.data;
2085  }
2086 
2087  /* generate page, taking into account hole if necessary */
2088  if (bkpb->hole_length == 0)
2089  {
2090  memcpy(page, ptr, BLCKSZ);
2091  }
2092  else
2093  {
2094  memcpy(page, ptr, bkpb->hole_offset);
2095  /* must zero-fill the hole */
2096  MemSet(page + bkpb->hole_offset, 0, bkpb->hole_length);
2097  memcpy(page + (bkpb->hole_offset + bkpb->hole_length),
2098  ptr + bkpb->hole_offset,
2099  BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
2100  }
2101 
2102  return true;
2103 }
#define MemSet(start, val, len)
Definition: c.h:1008
int32 pglz_decompress(const char *source, int32 slen, char *dest, int32 rawsize, bool check_complete)
DecodedXLogRecord * record
Definition: xlogreader.h:236
XLogRecPtr ReadRecPtr
Definition: xlogreader.h:206
char data[BLCKSZ]
Definition: c.h:1138
#define BKPIMAGE_COMPRESS_ZSTD
Definition: xlogrecord.h:151
#define BKPIMAGE_COMPRESS_LZ4
Definition: xlogrecord.h:150
#define BKPIMAGE_COMPRESS_PGLZ
Definition: xlogrecord.h:149

References DecodedBkpBlock::bimg_info, DecodedBkpBlock::bimg_len, DecodedBkpBlock::bkp_image, BKPIMAGE_COMPRESS_LZ4, BKPIMAGE_COMPRESS_PGLZ, BKPIMAGE_COMPRESS_ZSTD, BKPIMAGE_COMPRESSED, DecodedXLogRecord::blocks, PGAlignedBlock::data, DecodedBkpBlock::has_image, DecodedBkpBlock::hole_length, DecodedBkpBlock::hole_offset, DecodedBkpBlock::in_use, LSN_FORMAT_ARGS, DecodedXLogRecord::max_block_id, MemSet, pglz_decompress(), XLogReaderState::ReadRecPtr, XLogReaderState::record, and report_invalid_record().

Referenced by verifyBackupPageConsistency(), and XLogReadBufferForRedoExtended().

◆ ValidXLogRecord()

static bool ValidXLogRecord ( XLogReaderState state,
XLogRecord record,
XLogRecPtr  recptr 
)
static

Definition at line 1161 of file xlogreader.c.

1162 {
1163  pg_crc32c crc;
1164 
1165  /* Calculate the CRC */
1166  INIT_CRC32C(crc);
1167  COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord);
1168  /* include the record header last */
1169  COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc));
1170  FIN_CRC32C(crc);
1171 
1172  if (!EQ_CRC32C(record->xl_crc, crc))
1173  {
1175  "incorrect resource manager data checksum in record at %X/%X",
1176  LSN_FORMAT_ARGS(recptr));
1177  return false;
1178  }
1179 
1180  return true;
1181 }
uint32 pg_crc32c
Definition: pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:89
#define EQ_CRC32C(c1, c2)
Definition: pg_crc32c.h:42
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:94
return crc

References COMP_CRC32C, crc, EQ_CRC32C, FIN_CRC32C, INIT_CRC32C, LSN_FORMAT_ARGS, offsetof, report_invalid_record(), SizeOfXLogRecord, XLogRecord::xl_crc, and XLogRecord::xl_tot_len.

Referenced by XLogDecodeNextRecord().

◆ ValidXLogRecordHeader()

static bool ValidXLogRecordHeader ( XLogReaderState state,
XLogRecPtr  RecPtr,
XLogRecPtr  PrevRecPtr,
XLogRecord record,
bool  randAccess 
)
static

Definition at line 1095 of file xlogreader.c.

1098 {
1099  if (record->xl_tot_len < SizeOfXLogRecord)
1100  {
1102  "invalid record length at %X/%X: wanted %u, got %u",
1103  LSN_FORMAT_ARGS(RecPtr),
1104  (uint32) SizeOfXLogRecord, record->xl_tot_len);
1105  return false;
1106  }
1107  if (!RmgrIdIsValid(record->xl_rmid))
1108  {
1110  "invalid resource manager ID %u at %X/%X",
1111  record->xl_rmid, LSN_FORMAT_ARGS(RecPtr));
1112  return false;
1113  }
1114  if (randAccess)
1115  {
1116  /*
1117  * We can't exactly verify the prev-link, but surely it should be less
1118  * than the record's own address.
1119  */
1120  if (!(record->xl_prev < RecPtr))
1121  {
1123  "record with incorrect prev-link %X/%X at %X/%X",
1124  LSN_FORMAT_ARGS(record->xl_prev),
1125  LSN_FORMAT_ARGS(RecPtr));
1126  return false;
1127  }
1128  }
1129  else
1130  {
1131  /*
1132  * Record's prev-link should exactly match our previous location. This
1133  * check guards against torn WAL pages where a stale but valid-looking
1134  * WAL record starts on a sector boundary.
1135  */
1136  if (record->xl_prev != PrevRecPtr)
1137  {
1139  "record with incorrect prev-link %X/%X at %X/%X",
1140  LSN_FORMAT_ARGS(record->xl_prev),
1141  LSN_FORMAT_ARGS(RecPtr));
1142  return false;
1143  }
1144  }
1145 
1146  return true;
1147 }
#define RmgrIdIsValid(rmid)
Definition: rmgr.h:53
XLogRecPtr xl_prev
Definition: xlogrecord.h:45
RmgrId xl_rmid
Definition: xlogrecord.h:47

References LSN_FORMAT_ARGS, report_invalid_record(), RmgrIdIsValid, SizeOfXLogRecord, XLogRecord::xl_prev, XLogRecord::xl_rmid, and XLogRecord::xl_tot_len.

Referenced by XLogDecodeNextRecord().

◆ WALOpenSegmentInit()

static void WALOpenSegmentInit ( WALOpenSegment seg,
WALSegmentContext segcxt,
int  segsize,
const char *  waldir 
)
static

Definition at line 240 of file xlogreader.c.

242 {
243  seg->ws_file = -1;
244  seg->ws_segno = 0;
245  seg->ws_tli = 0;
246 
247  segcxt->ws_segsize = segsize;
248  if (waldir)
249  snprintf(segcxt->ws_dir, MAXPGPATH, "%s", waldir);
250 }
#define MAXPGPATH
#define snprintf
Definition: port.h:225
XLogSegNo ws_segno
Definition: xlogreader.h:48
TimeLineID ws_tli
Definition: xlogreader.h:49
char ws_dir[MAXPGPATH]
Definition: xlogreader.h:55

References MAXPGPATH, snprintf, WALSegmentContext::ws_dir, WALOpenSegment::ws_file, WALOpenSegment::ws_segno, WALSegmentContext::ws_segsize, and WALOpenSegment::ws_tli.

Referenced by XLogReaderAllocate().

◆ WALRead()

bool WALRead ( XLogReaderState state,
char *  buf,
XLogRecPtr  startptr,
Size  count,
TimeLineID  tli,
WALReadError errinfo 
)

Definition at line 1460 of file xlogreader.c.

1463 {
1464  char *p;
1465  XLogRecPtr recptr;
1466  Size nbytes;
1467 
1468  p = buf;
1469  recptr = startptr;
1470  nbytes = count;
1471 
1472  while (nbytes > 0)
1473  {
1474  uint32 startoff;
1475  int segbytes;
1476  int readbytes;
1477 
1478  startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
1479 
1480  /*
1481  * If the data we want is not in a segment we have open, close what we
1482  * have (if anything) and open the next one, using the caller's
1483  * provided openSegment callback.
1484  */
1485  if (state->seg.ws_file < 0 ||
1486  !XLByteInSeg(recptr, state->seg.ws_segno, state->segcxt.ws_segsize) ||
1487  tli != state->seg.ws_tli)
1488  {
1489  XLogSegNo nextSegNo;
1490 
1491  if (state->seg.ws_file >= 0)
1492  state->routine.segment_close(state);
1493 
1494  XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize);
1495  state->routine.segment_open(state, nextSegNo, &tli);
1496 
1497  /* This shouldn't happen -- indicates a bug in segment_open */
1498  Assert(state->seg.ws_file >= 0);
1499 
1500  /* Update the current segment info. */
1501  state->seg.ws_tli = tli;
1502  state->seg.ws_segno = nextSegNo;
1503  }
1504 
1505  /* How many bytes are within this segment? */
1506  if (nbytes > (state->segcxt.ws_segsize - startoff))
1507  segbytes = state->segcxt.ws_segsize - startoff;
1508  else
1509  segbytes = nbytes;
1510 
1511 #ifndef FRONTEND
1513 #endif
1514 
1515  /* Reset errno first; eases reporting non-errno-affecting errors */
1516  errno = 0;
1517  readbytes = pg_pread(state->seg.ws_file, p, segbytes, (off_t) startoff);
1518 
1519 #ifndef FRONTEND
1521 #endif
1522 
1523  if (readbytes <= 0)
1524  {
1525  errinfo->wre_errno = errno;
1526  errinfo->wre_req = segbytes;
1527  errinfo->wre_read = readbytes;
1528  errinfo->wre_off = startoff;
1529  errinfo->wre_seg = state->seg;
1530  return false;
1531  }
1532 
1533  /* Update state for read */
1534  recptr += readbytes;
1535  nbytes -= readbytes;
1536  p += readbytes;
1537  }
1538 
1539  return true;
1540 }
size_t Size
Definition: c.h:540
static char * buf
Definition: pg_test_fsync.c:67
ssize_t pg_pread(int fd, void *buf, size_t nbyte, off_t offset)
Definition: pread.c:27
WALOpenSegment wre_seg
Definition: xlogreader.h:386
@ WAIT_EVENT_WAL_READ
Definition: wait_event.h:229
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:266
static void pgstat_report_wait_end(void)
Definition: wait_event.h:282
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)

References Assert(), buf, pg_pread(), pgstat_report_wait_end(), pgstat_report_wait_start(), WAIT_EVENT_WAL_READ, WALReadError::wre_errno, WALReadError::wre_off, WALReadError::wre_read, WALReadError::wre_req, WALReadError::wre_seg, XLByteInSeg, XLByteToSeg, and XLogSegmentOffset.

Referenced by logical_read_xlog_page(), read_local_xlog_page_guts(), WALDumpReadPage(), and XLogSendPhysical().

◆ XLogBeginRead()

void XLogBeginRead ( XLogReaderState state,
XLogRecPtr  RecPtr 
)

Definition at line 264 of file xlogreader.c.

265 {
266  Assert(!XLogRecPtrIsInvalid(RecPtr));
267 
269 
270  /* Begin at the passed-in record pointer. */
271  state->EndRecPtr = RecPtr;
272  state->NextRecPtr = RecPtr;
273  state->ReadRecPtr = InvalidXLogRecPtr;
274  state->DecodeRecPtr = InvalidXLogRecPtr;
275 }
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void ResetDecoder(XLogReaderState *state)
Definition: xlogreader.c:1552

References Assert(), InvalidXLogRecPtr, ResetDecoder(), and XLogRecPtrIsInvalid.

Referenced by DecodingContextFindStartpoint(), extractPageMap(), findLastCheckpoint(), pg_logical_replication_slot_advance(), pg_logical_slot_get_changes_guts(), readOneRecord(), StartLogicalReplication(), XLogFindNextRecord(), XLogPrefetcherBeginRead(), and XlogReadTwoPhaseData().

◆ XLogDecodeNextRecord()

static XLogPageReadResult XLogDecodeNextRecord ( XLogReaderState state,
bool  non_blocking 
)
static

Definition at line 531 of file xlogreader.c.

532 {
533  XLogRecPtr RecPtr;
534  XLogRecord *record;
535  XLogRecPtr targetPagePtr;
536  bool randAccess;
537  uint32 len,
538  total_len;
539  uint32 targetRecOff;
540  uint32 pageHeaderSize;
541  bool assembled;
542  bool gotheader;
543  int readOff;
544  DecodedXLogRecord *decoded;
545  char *errormsg; /* not used */
546 
547  /*
548  * randAccess indicates whether to verify the previous-record pointer of
549  * the record we're reading. We only do this if we're reading
550  * sequentially, which is what we initially assume.
551  */
552  randAccess = false;
553 
554  /* reset error state */
555  state->errormsg_buf[0] = '\0';
556  decoded = NULL;
557 
558  state->abortedRecPtr = InvalidXLogRecPtr;
559  state->missingContrecPtr = InvalidXLogRecPtr;
560 
561  RecPtr = state->NextRecPtr;
562 
563  if (state->DecodeRecPtr != InvalidXLogRecPtr)
564  {
565  /* read the record after the one we just read */
566 
567  /*
568  * NextRecPtr is pointing to end+1 of the previous WAL record. If
569  * we're at a page boundary, no more records can fit on the current
570  * page. We must skip over the page header, but we can't do that until
571  * we've read in the page, since the header size is variable.
572  */
573  }
574  else
575  {
576  /*
577  * Caller supplied a position to start at.
578  *
579  * In this case, NextRecPtr should already be pointing to a valid
580  * record starting position.
581  */
582  Assert(XRecOffIsValid(RecPtr));
583  randAccess = true;
584  }
585 
586 restart:
587  state->nonblocking = nonblocking;
588  state->currRecPtr = RecPtr;
589  assembled = false;
590 
591  targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
592  targetRecOff = RecPtr % XLOG_BLCKSZ;
593 
594  /*
595  * Read the page containing the record into state->readBuf. Request enough
596  * byte to cover the whole record header, or at least the part of it that
597  * fits on the same page.
598  */
599  readOff = ReadPageInternal(state, targetPagePtr,
600  Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
601  if (readOff == XLREAD_WOULDBLOCK)
602  return XLREAD_WOULDBLOCK;
603  else if (readOff < 0)
604  goto err;
605 
606  /*
607  * ReadPageInternal always returns at least the page header, so we can
608  * examine it now.
609  */
610  pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
611  if (targetRecOff == 0)
612  {
613  /*
614  * At page start, so skip over page header.
615  */
616  RecPtr += pageHeaderSize;
617  targetRecOff = pageHeaderSize;
618  }
619  else if (targetRecOff < pageHeaderSize)
620  {
621  report_invalid_record(state, "invalid record offset at %X/%X",
622  LSN_FORMAT_ARGS(RecPtr));
623  goto err;
624  }
625 
626  if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
627  targetRecOff == pageHeaderSize)
628  {
629  report_invalid_record(state, "contrecord is requested by %X/%X",
630  LSN_FORMAT_ARGS(RecPtr));
631  goto err;
632  }
633 
634  /* ReadPageInternal has verified the page header */
635  Assert(pageHeaderSize <= readOff);
636 
637  /*
638  * Read the record length.
639  *
640  * NB: Even though we use an XLogRecord pointer here, the whole record
641  * header might not fit on this page. xl_tot_len is the first field of the
642  * struct, so it must be on this page (the records are MAXALIGNed), but we
643  * cannot access any other fields until we've verified that we got the
644  * whole header.
645  */
646  record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
647  total_len = record->xl_tot_len;
648 
649  /*
650  * If the whole record header is on this page, validate it immediately.
651  * Otherwise do just a basic sanity check on xl_tot_len, and validate the
652  * rest of the header after reading it from the next page. The xl_tot_len
653  * check is necessary here to ensure that we enter the "Need to reassemble
654  * record" code path below; otherwise we might fail to apply
655  * ValidXLogRecordHeader at all.
656  */
657  if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
658  {
659  if (!ValidXLogRecordHeader(state, RecPtr, state->DecodeRecPtr, record,
660  randAccess))
661  goto err;
662  gotheader = true;
663  }
664  else
665  {
666  /* XXX: more validation should be done here */
667  if (total_len < SizeOfXLogRecord)
668  {
670  "invalid record length at %X/%X: wanted %u, got %u",
671  LSN_FORMAT_ARGS(RecPtr),
672  (uint32) SizeOfXLogRecord, total_len);
673  goto err;
674  }
675  gotheader = false;
676  }
677 
678  /*
679  * Find space to decode this record. Don't allow oversized allocation if
680  * the caller requested nonblocking. Otherwise, we *have* to try to
681  * decode the record now because the caller has nothing else to do, so
682  * allow an oversized record to be palloc'd if that turns out to be
683  * necessary.
684  */
685  decoded = XLogReadRecordAlloc(state,
686  total_len,
687  !nonblocking /* allow_oversized */ );
688  if (decoded == NULL)
689  {
690  /*
691  * There is no space in the decode buffer. The caller should help
692  * with that problem by consuming some records.
693  */
694  if (nonblocking)
695  return XLREAD_WOULDBLOCK;
696 
697  /* We failed to allocate memory for an oversized record. */
699  "out of memory while trying to decode a record of length %u", total_len);
700  goto err;
701  }
702 
703  len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
704  if (total_len > len)
705  {
706  /* Need to reassemble record */
707  char *contdata;
708  XLogPageHeader pageHeader;
709  char *buffer;
710  uint32 gotlen;
711 
712  assembled = true;
713 
714  /*
715  * Enlarge readRecordBuf as needed.
716  */
717  if (total_len > state->readRecordBufSize &&
718  !allocate_recordbuf(state, total_len))
719  {
720  /* We treat this as a "bogus data" condition */
721  report_invalid_record(state, "record length %u at %X/%X too long",
722  total_len, LSN_FORMAT_ARGS(RecPtr));
723  goto err;
724  }
725 
726  /* Copy the first fragment of the record from the first page. */
727  memcpy(state->readRecordBuf,
728  state->readBuf + RecPtr % XLOG_BLCKSZ, len);
729  buffer = state->readRecordBuf + len;
730  gotlen = len;
731 
732  do
733  {
734  /* Calculate pointer to beginning of next page */
735  targetPagePtr += XLOG_BLCKSZ;
736 
737  /* Wait for the next page to become available */
738  readOff = ReadPageInternal(state, targetPagePtr,
739  Min(total_len - gotlen + SizeOfXLogShortPHD,
740  XLOG_BLCKSZ));
741 
742  if (readOff == XLREAD_WOULDBLOCK)
743  return XLREAD_WOULDBLOCK;
744  else if (readOff < 0)
745  goto err;
746 
748 
749  pageHeader = (XLogPageHeader) state->readBuf;
750 
751  /*
752  * If we were expecting a continuation record and got an
753  * "overwrite contrecord" flag, that means the continuation record
754  * was overwritten with a different record. Restart the read by
755  * assuming the address to read is the location where we found
756  * this flag; but keep track of the LSN of the record we were
757  * reading, for later verification.
758  */
760  {
761  state->overwrittenRecPtr = RecPtr;
762  RecPtr = targetPagePtr;
763  goto restart;
764  }
765 
766  /* Check that the continuation on next page looks valid */
767  if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
768  {
770  "there is no contrecord flag at %X/%X",
771  LSN_FORMAT_ARGS(RecPtr));
772  goto err;
773  }
774 
775  /*
776  * Cross-check that xlp_rem_len agrees with how much of the record
777  * we expect there to be left.
778  */
779  if (pageHeader->xlp_rem_len == 0 ||
780  total_len != (pageHeader->xlp_rem_len + gotlen))
781  {
783  "invalid contrecord length %u (expected %lld) at %X/%X",
784  pageHeader->xlp_rem_len,
785  ((long long) total_len) - gotlen,
786  LSN_FORMAT_ARGS(RecPtr));
787  goto err;
788  }
789 
790  /* Append the continuation from this page to the buffer */
791  pageHeaderSize = XLogPageHeaderSize(pageHeader);
792 
793  if (readOff < pageHeaderSize)
794  readOff = ReadPageInternal(state, targetPagePtr,
795  pageHeaderSize);
796 
797  Assert(pageHeaderSize <= readOff);
798 
799  contdata = (char *) state->readBuf + pageHeaderSize;
800  len = XLOG_BLCKSZ - pageHeaderSize;
801  if (pageHeader->xlp_rem_len < len)
802  len = pageHeader->xlp_rem_len;
803 
804  if (readOff < pageHeaderSize + len)
805  readOff = ReadPageInternal(state, targetPagePtr,
806  pageHeaderSize + len);
807 
808  memcpy(buffer, (char *) contdata, len);
809  buffer += len;
810  gotlen += len;
811 
812  /* If we just reassembled the record header, validate it. */
813  if (!gotheader)
814  {
815  record = (XLogRecord *) state->readRecordBuf;
816  if (!ValidXLogRecordHeader(state, RecPtr, state->DecodeRecPtr,
817  record, randAccess))
818  goto err;
819  gotheader = true;
820  }
821  } while (gotlen < total_len);
822 
823  Assert(gotheader);
824 
825  record = (XLogRecord *) state->readRecordBuf;
826  if (!ValidXLogRecord(state, record, RecPtr))
827  goto err;
828 
829  pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
830  state->DecodeRecPtr = RecPtr;
831  state->NextRecPtr = targetPagePtr + pageHeaderSize
832  + MAXALIGN(pageHeader->xlp_rem_len);
833  }
834  else
835  {
836  /* Wait for the record data to become available */
837  readOff = ReadPageInternal(state, targetPagePtr,
838  Min(targetRecOff + total_len, XLOG_BLCKSZ));
839  if (readOff == XLREAD_WOULDBLOCK)
840  return XLREAD_WOULDBLOCK;
841  else if (readOff < 0)
842  goto err;
843 
844  /* Record does not cross a page boundary */
845  if (!ValidXLogRecord(state, record, RecPtr))
846  goto err;
847 
848  state->NextRecPtr = RecPtr + MAXALIGN(total_len);
849 
850  state->DecodeRecPtr = RecPtr;
851  }
852 
853  /*
854  * Special processing if it's an XLOG SWITCH record
855  */
856  if (record->xl_rmid == RM_XLOG_ID &&
857  (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
858  {
859  /* Pretend it extends to end of segment */
860  state->NextRecPtr += state->segcxt.ws_segsize - 1;
861  state->NextRecPtr -= XLogSegmentOffset(state->NextRecPtr, state->segcxt.ws_segsize);
862  }
863 
864  if (DecodeXLogRecord(state, decoded, record, RecPtr, &errormsg))
865  {
866  /* Record the location of the next record. */
867  decoded->next_lsn = state->NextRecPtr;
868 
869  /*
870  * If it's in the decode buffer, mark the decode buffer space as
871  * occupied.
872  */
873  if (!decoded->oversized)
874  {
875  /* The new decode buffer head must be MAXALIGNed. */
876  Assert(decoded->size == MAXALIGN(decoded->size));
877  if ((char *) decoded == state->decode_buffer)
878  state->decode_buffer_tail = state->decode_buffer + decoded->size;
879  else
880  state->decode_buffer_tail += decoded->size;
881  }
882 
883  /* Insert it into the queue of decoded records. */
884  Assert(state->decode_queue_tail != decoded);
885  if (state->decode_queue_tail)
886  state->decode_queue_tail->next = decoded;
887  state->decode_queue_tail = decoded;
888  if (!state->decode_queue_head)
889  state->decode_queue_head = decoded;
890  return XLREAD_SUCCESS;
891  }
892  else
893  return XLREAD_FAIL;
894 
895 err:
896  if (assembled)
897  {
898  /*
899  * We get here when a record that spans multiple pages needs to be
900  * assembled, but something went wrong -- perhaps a contrecord piece
901  * was lost. If caller is WAL replay, it will know where the aborted
902  * record was and where to direct followup WAL to be written, marking
903  * the next piece with XLP_FIRST_IS_OVERWRITE_CONTRECORD, which will
904  * in turn signal downstream WAL consumers that the broken WAL record
905  * is to be ignored.
906  */
907  state->abortedRecPtr = RecPtr;
908  state->missingContrecPtr = targetPagePtr;
909  }
910 
911  if (decoded && decoded->oversized)
912  pfree(decoded);
913 
914  /*
915  * Invalidate the read state. We might read from a different source after
916  * failure.
917  */
919 
920  /*
921  * If an error was written to errmsg_buf, it'll be returned to the caller
922  * of XLogReadRecord() after all successfully decoded records from the
923  * read queue.
924  */
925 
926  return XLREAD_FAIL;
927 }
#define Min(x, y)
Definition: c.h:986
#define XLOG_SWITCH
Definition: pg_control.h:71
const void size_t len
XLogRecPtr next_lsn
Definition: xlogreader.h:165
uint8 xl_info
Definition: xlogrecord.h:46
struct state * next
Definition: regguts.h:327
#define XLP_FIRST_IS_CONTRECORD
Definition: xlog_internal.h:74
#define XLP_FIRST_IS_OVERWRITE_CONTRECORD
Definition: xlog_internal.h:80
#define XRecOffIsValid(xlrp)
static void static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength)
Definition: xlogreader.c:197
static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
Definition: xlogreader.c:972
bool DecodeXLogRecord(XLogReaderState *state, DecodedXLogRecord *decoded, XLogRecord *record, XLogRecPtr lsn, char **errormsg)
Definition: xlogreader.c:1619
static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr)
Definition: xlogreader.c:1161
static DecodedXLogRecord * XLogReadRecordAlloc(XLogReaderState *state, size_t xl_tot_len, bool allow_oversized)
Definition: xlogreader.c:467
static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess)
Definition: xlogreader.c:1095
@ XLREAD_SUCCESS
Definition: xlogreader.h:352
#define XLR_INFO_MASK
Definition: xlogrecord.h:62
static uint32 readOff
Definition: xlogrecovery.c:229

References allocate_recordbuf(), Assert(), DecodeXLogRecord(), if(), InvalidXLogRecPtr, len, LSN_FORMAT_ARGS, MAXALIGN, Min, state::next, DecodedXLogRecord::next_lsn, DecodedXLogRecord::oversized, pfree(), readOff, ReadPageInternal(), report_invalid_record(), DecodedXLogRecord::size, SizeOfXLogRecord, SizeOfXLogShortPHD, ValidXLogRecord(), ValidXLogRecordHeader(), XLogRecord::xl_info, XLogRecord::xl_rmid, XLogRecord::xl_tot_len, XLOG_SWITCH, XLogPageHeaderSize, XLogReaderInvalReadState(), XLogReadRecordAlloc(), XLogSegmentOffset, XLP_FIRST_IS_CONTRECORD, XLP_FIRST_IS_OVERWRITE_CONTRECORD, XLogPageHeaderData::xlp_info, XLogPageHeaderData::xlp_rem_len, XLR_INFO_MASK, XLREAD_FAIL, XLREAD_SUCCESS, XLREAD_WOULDBLOCK, and XRecOffIsValid.

Referenced by XLogReadAhead().

◆ XLogFindNextRecord()

XLogRecPtr XLogFindNextRecord ( XLogReaderState state,
XLogRecPtr  RecPtr 
)

Definition at line 1337 of file xlogreader.c.

1338 {
1339  XLogRecPtr tmpRecPtr;
1340  XLogRecPtr found = InvalidXLogRecPtr;
1342  char *errormsg;
1343 
1344  Assert(!XLogRecPtrIsInvalid(RecPtr));
1345 
1346  /* Make sure ReadPageInternal() can't return XLREAD_WOULDBLOCK. */
1347  state->nonblocking = false;
1348 
1349  /*
1350  * skip over potential continuation data, keeping in mind that it may span
1351  * multiple pages
1352  */
1353  tmpRecPtr = RecPtr;
1354  while (true)
1355  {
1356  XLogRecPtr targetPagePtr;
1357  int targetRecOff;
1358  uint32 pageHeaderSize;
1359  int readLen;
1360 
1361  /*
1362  * Compute targetRecOff. It should typically be equal or greater than
1363  * short page-header since a valid record can't start anywhere before
1364  * that, except when caller has explicitly specified the offset that
1365  * falls somewhere there or when we are skipping multi-page
1366  * continuation record. It doesn't matter though because
1367  * ReadPageInternal() is prepared to handle that and will read at
1368  * least short page-header worth of data
1369  */
1370  targetRecOff = tmpRecPtr % XLOG_BLCKSZ;
1371 
1372  /* scroll back to page boundary */
1373  targetPagePtr = tmpRecPtr - targetRecOff;
1374 
1375  /* Read the page containing the record */
1376  readLen = ReadPageInternal(state, targetPagePtr, targetRecOff);
1377  if (readLen < 0)
1378  goto err;
1379 
1380  header = (XLogPageHeader) state->readBuf;
1381 
1382  pageHeaderSize = XLogPageHeaderSize(header);
1383 
1384  /* make sure we have enough data for the page header */
1385  readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize);
1386  if (readLen < 0)
1387  goto err;
1388 
1389  /* skip over potential continuation data */
1390  if (header->xlp_info & XLP_FIRST_IS_CONTRECORD)
1391  {
1392  /*
1393  * If the length of the remaining continuation data is more than
1394  * what can fit in this page, the continuation record crosses over
1395  * this page. Read the next page and try again. xlp_rem_len in the
1396  * next page header will contain the remaining length of the
1397  * continuation data
1398  *
1399  * Note that record headers are MAXALIGN'ed
1400  */
1401  if (MAXALIGN(header->xlp_rem_len) >= (XLOG_BLCKSZ - pageHeaderSize))
1402  tmpRecPtr = targetPagePtr + XLOG_BLCKSZ;
1403  else
1404  {
1405  /*
1406  * The previous continuation record ends in this page. Set
1407  * tmpRecPtr to point to the first valid record
1408  */
1409  tmpRecPtr = targetPagePtr + pageHeaderSize
1410  + MAXALIGN(header->xlp_rem_len);
1411  break;
1412  }
1413  }
1414  else
1415  {
1416  tmpRecPtr = targetPagePtr + pageHeaderSize;
1417  break;
1418  }
1419  }
1420 
1421  /*
1422  * we know now that tmpRecPtr is an address pointing to a valid XLogRecord
1423  * because either we're at the first record after the beginning of a page
1424  * or we just jumped over the remaining data of a continuation.
1425  */
1426  XLogBeginRead(state, tmpRecPtr);
1427  while (XLogReadRecord(state, &errormsg) != NULL)
1428  {
1429  /* past the record we've found, break out */
1430  if (RecPtr <= state->ReadRecPtr)
1431  {
1432  /* Rewind the reader to the beginning of the last record. */
1433  found = state->ReadRecPtr;
1434  XLogBeginRead(state, found);
1435  return found;
1436  }
1437  }
1438 
1439 err:
1441 
1442  return InvalidXLogRecPtr;
1443 }
static void header(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:212
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:418
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:264

References Assert(), header(), InvalidXLogRecPtr, MAXALIGN, readLen, ReadPageInternal(), XLogBeginRead(), XLogPageHeaderSize, XLogReaderInvalReadState(), XLogReadRecord(), XLogRecPtrIsInvalid, and XLP_FIRST_IS_CONTRECORD.

Referenced by InitXLogReaderState(), and main().

◆ XLogNextRecord()

DecodedXLogRecord* XLogNextRecord ( XLogReaderState state,
char **  errormsg 
)

Definition at line 354 of file xlogreader.c.

355 {
356  /* Release the last record returned by XLogNextRecord(). */
358 
359  if (state->decode_queue_head == NULL)
360  {
361  *errormsg = NULL;
362  if (state->errormsg_deferred)
363  {
364  if (state->errormsg_buf[0] != '\0')
365  *errormsg = state->errormsg_buf;
366  state->errormsg_deferred = false;
367  }
368 
369  /*
370  * state->EndRecPtr is expected to have been set by the last call to
371  * XLogBeginRead() or XLogNextRecord(), and is the location of the
372  * error.
373  */
374  Assert(!XLogRecPtrIsInvalid(state->EndRecPtr));
375 
376  return NULL;
377  }
378 
379  /*
380  * Record this as the most recent record returned, so that we'll release
381  * it next time. This also exposes it to the traditional
382  * XLogRecXXX(xlogreader) macros, which work with the decoder rather than
383  * the record for historical reasons.
384  */
385  state->record = state->decode_queue_head;
386 
387  /*
388  * Update the pointers to the beginning and one-past-the-end of this
389  * record, again for the benefit of historical code that expected the
390  * decoder to track this rather than accessing these fields of the record
391  * itself.
392  */
393  state->ReadRecPtr = state->record->lsn;
394  state->EndRecPtr = state->record->next_lsn;
395 
396  *errormsg = NULL;
397 
398  return state->record;
399 }
void XLogReleasePreviousRecord(XLogReaderState *state)
Definition: xlogreader.c:282

References Assert(), XLogRecPtrIsInvalid, and XLogReleasePreviousRecord().

Referenced by XLogPrefetcherReadRecord(), and XLogReadRecord().

◆ XLogReadAhead()

DecodedXLogRecord* XLogReadAhead ( XLogReaderState state,
bool  nonblocking 
)

Definition at line 938 of file xlogreader.c.

939 {
940  XLogPageReadResult result;
941 
942  if (state->errormsg_deferred)
943  return NULL;
944 
945  result = XLogDecodeNextRecord(state, nonblocking);
946  if (result == XLREAD_SUCCESS)
947  {
948  Assert(state->decode_queue_tail != NULL);
949  return state->decode_queue_tail;
950  }
951 
952  return NULL;
953 }
static XLogPageReadResult XLogDecodeNextRecord(XLogReaderState *state, bool non_blocking)
Definition: xlogreader.c:531
XLogPageReadResult
Definition: xlogreader.h:351

References Assert(), XLogDecodeNextRecord(), and XLREAD_SUCCESS.

Referenced by XLogPrefetcherNextBlock(), and XLogReadRecord().

◆ XLogReaderAllocate()

XLogReaderState* XLogReaderAllocate ( int  wal_segment_size,
const char *  waldir,
XLogReaderRoutine routine,
void *  private_data 
)

Definition at line 108 of file xlogreader.c.

110 {
112 
113  state = (XLogReaderState *)
116  if (!state)
117  return NULL;
118 
119  /* initialize caller-provided support functions */
120  state->routine = *routine;
121 
122  /*
123  * Permanently allocate readBuf. We do it this way, rather than just
124  * making a static array, for two reasons: (1) no need to waste the
125  * storage in most instantiations of the backend; (2) a static char array
126  * isn't guaranteed to have any particular alignment, whereas
127  * palloc_extended() will provide MAXALIGN'd storage.
128  */
129  state->readBuf = (char *) palloc_extended(XLOG_BLCKSZ,
131  if (!state->readBuf)
132  {
133  pfree(state);
134  return NULL;
135  }
136 
137  /* Initialize segment info. */
139  waldir);
140 
141  /* system_identifier initialized to zeroes above */
142  state->private_data = private_data;
143  /* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */
144  state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1,
146  if (!state->errormsg_buf)
147  {
148  pfree(state->readBuf);
149  pfree(state);
150  return NULL;
151  }
152  state->errormsg_buf[0] = '\0';
153 
154  /*
155  * Allocate an initial readRecordBuf of minimal size, which can later be
156  * enlarged if necessary.
157  */
158  if (!allocate_recordbuf(state, 0))
159  {
160  pfree(state->errormsg_buf);
161  pfree(state->readBuf);
162  pfree(state);
163  return NULL;
164  }
165 
166  return state;
167 }
#define MCXT_ALLOC_ZERO
Definition: fe_memutils.h:18
int wal_segment_size
Definition: xlog.c:144
static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, int segsize, const char *waldir)
Definition: xlogreader.c:240

References allocate_recordbuf(), MAX_ERRORMSG_LEN, MCXT_ALLOC_NO_OOM, MCXT_ALLOC_ZERO, palloc_extended(), pfree(), wal_segment_size, and WALOpenSegmentInit().

Referenced by extractPageMap(), findLastCheckpoint(), InitWalRecovery(), InitXLogReaderState(), main(), readOneRecord(), StartReplication(), StartupDecodingContext(), XLogInsertRecord(), and XlogReadTwoPhaseData().

◆ XLogReaderFree()

void XLogReaderFree ( XLogReaderState state)

Definition at line 170 of file xlogreader.c.

171 {
172  if (state->seg.ws_file != -1)
173  state->routine.segment_close(state);
174 
175  if (state->decode_buffer && state->free_decode_buffer)
176  pfree(state->decode_buffer);
177 
178  pfree(state->errormsg_buf);
179  if (state->readRecordBuf)
180  pfree(state->readRecordBuf);
181  pfree(state->readBuf);
182  pfree(state);
183 }

References pfree().

Referenced by extractPageMap(), findLastCheckpoint(), FreeDecodingContext(), GetWALRecordsInfo(), GetWalStats(), main(), pg_get_wal_record_info(), readOneRecord(), ShutdownWalRecovery(), and XlogReadTwoPhaseData().

◆ XLogReaderInvalReadState()

static void XLogReaderInvalReadState ( XLogReaderState state)
static

Definition at line 1081 of file xlogreader.c.

1082 {
1083  state->seg.ws_segno = 0;
1084  state->segoff = 0;
1085  state->readLen = 0;
1086 }

Referenced by ReadPageInternal(), XLogDecodeNextRecord(), and XLogFindNextRecord().

◆ XLogReaderSetDecodeBuffer()

void XLogReaderSetDecodeBuffer ( XLogReaderState state,
void *  buffer,
size_t  size 
)

Definition at line 92 of file xlogreader.c.

93 {
94  Assert(state->decode_buffer == NULL);
95 
96  state->decode_buffer = buffer;
97  state->decode_buffer_size = size;
98  state->decode_buffer_tail = buffer;
99  state->decode_buffer_head = buffer;
100 }

References Assert().

Referenced by InitWalRecovery().

◆ XLogReaderValidatePageHeader()

bool XLogReaderValidatePageHeader ( XLogReaderState state,
XLogRecPtr  recptr,
char *  phdr 
)

Definition at line 1190 of file xlogreader.c.

1192 {
1193  XLogRecPtr recaddr;
1194  XLogSegNo segno;
1195  int32 offset;
1196  XLogPageHeader hdr = (XLogPageHeader) phdr;
1197 
1198  Assert((recptr % XLOG_BLCKSZ) == 0);
1199 
1200  XLByteToSeg(recptr, segno, state->segcxt.ws_segsize);
1201  offset = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
1202 
1203  XLogSegNoOffsetToRecPtr(segno, offset, state->segcxt.ws_segsize, recaddr);
1204 
1205  if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
1206  {
1207  char fname[MAXFNAMELEN];
1208 
1209  XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
1210 
1212  "invalid magic number %04X in log segment %s, offset %u",
1213  hdr->xlp_magic,
1214  fname,
1215  offset);
1216  return false;
1217  }
1218 
1219  if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
1220  {
1221  char fname[MAXFNAMELEN];
1222 
1223  XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
1224 
1226  "invalid info bits %04X in log segment %s, offset %u",
1227  hdr->xlp_info,
1228  fname,
1229  offset);
1230  return false;
1231  }
1232 
1233  if (hdr->xlp_info & XLP_LONG_HEADER)
1234  {
1235  XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
1236 
1237  if (state->system_identifier &&
1238  longhdr->xlp_sysid != state->system_identifier)
1239  {
1241  "WAL file is from different database system: WAL file database system identifier is %llu, pg_control database system identifier is %llu",
1242  (unsigned long long) longhdr->xlp_sysid,
1243  (unsigned long long) state->system_identifier);
1244  return false;
1245  }
1246  else if (longhdr->xlp_seg_size != state->segcxt.ws_segsize)
1247  {
1249  "WAL file is from different database system: incorrect segment size in page header");
1250  return false;
1251  }
1252  else if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
1253  {
1255  "WAL file is from different database system: incorrect XLOG_BLCKSZ in page header");
1256  return false;
1257  }
1258  }
1259  else if (offset == 0)
1260  {
1261  char fname[MAXFNAMELEN];
1262 
1263  XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
1264 
1265  /* hmm, first page of file doesn't have a long header? */
1267  "invalid info bits %04X in log segment %s, offset %u",
1268  hdr->xlp_info,
1269  fname,
1270  offset);
1271  return false;
1272  }
1273 
1274  /*
1275  * Check that the address on the page agrees with what we expected. This
1276  * check typically fails when an old WAL segment is recycled, and hasn't
1277  * yet been overwritten with new data yet.
1278  */
1279  if (hdr->xlp_pageaddr != recaddr)
1280  {
1281  char fname[MAXFNAMELEN];
1282 
1283  XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
1284 
1286  "unexpected pageaddr %X/%X in log segment %s, offset %u",
1288  fname,
1289  offset);
1290  return false;
1291  }
1292 
1293  /*
1294  * Since child timelines are always assigned a TLI greater than their
1295  * immediate parent's TLI, we should never see TLI go backwards across
1296  * successive pages of a consistent WAL sequence.
1297  *
1298  * Sometimes we re-read a segment that's already been (partially) read. So
1299  * we only verify TLIs for pages that are later than the last remembered
1300  * LSN.
1301  */
1302  if (recptr > state->latestPagePtr)
1303  {
1304  if (hdr->xlp_tli < state->latestPageTLI)
1305  {
1306  char fname[MAXFNAMELEN];
1307 
1308  XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
1309 
1311  "out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u",
1312  hdr->xlp_tli,
1313  state->latestPageTLI,
1314  fname,
1315  offset);
1316  return false;
1317  }
1318  }
1319  state->latestPagePtr = recptr;
1320  state->latestPageTLI = hdr->xlp_tli;
1321 
1322  return true;
1323 }
signed int int32
Definition: c.h:429
TimeLineID xlp_tli
Definition: xlog_internal.h:40
XLogRecPtr xlp_pageaddr
Definition: xlog_internal.h:41
XLogLongPageHeaderData * XLogLongPageHeader
Definition: xlog_internal.h:71
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define MAXFNAMELEN
#define XLP_LONG_HEADER
Definition: xlog_internal.h:76
#define XLP_ALL_FLAGS
Definition: xlog_internal.h:82
#define XLOG_PAGE_MAGIC
Definition: xlog_internal.h:34

References Assert(), LSN_FORMAT_ARGS, MAXFNAMELEN, report_invalid_record(), XLByteToSeg, XLOG_PAGE_MAGIC, XLogFileName, XLogSegmentOffset, XLogSegNoOffsetToRecPtr, XLP_ALL_FLAGS, XLogPageHeaderData::xlp_info, XLP_LONG_HEADER, XLogPageHeaderData::xlp_magic, XLogPageHeaderData::xlp_pageaddr, XLogLongPageHeaderData::xlp_seg_size, XLogLongPageHeaderData::xlp_sysid, XLogPageHeaderData::xlp_tli, and XLogLongPageHeaderData::xlp_xlog_blcksz.

Referenced by ReadPageInternal(), and XLogPageRead().

◆ XLogReadRecord()

XLogRecord* XLogReadRecord ( XLogReaderState state,
char **  errormsg 
)

Definition at line 418 of file xlogreader.c.

419 {
420  DecodedXLogRecord *decoded;
421 
422  /*
423  * Release last returned record, if there is one. We need to do this so
424  * that we can check for empty decode queue accurately.
425  */
427 
428  /*
429  * Call XLogReadAhead() in blocking mode to make sure there is something
430  * in the queue, though we don't use the result.
431  */
433  XLogReadAhead(state, false /* nonblocking */ );
434 
435  /* Consume the head record or error. */
436  decoded = XLogNextRecord(state, errormsg);
437  if (decoded)
438  {
439  /*
440  * This function returns a pointer to the record's header, not the
441  * actual decoded record. The caller will access the decoded record
442  * through the XLogRecGetXXX() macros, which reach the decoded
443  * recorded as xlogreader->record.
444  */
445  Assert(state->record == decoded);
446  return &decoded->header;
447  }
448 
449  return NULL;
450 }
DecodedXLogRecord * XLogNextRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:354
DecodedXLogRecord * XLogReadAhead(XLogReaderState *state, bool nonblocking)
Definition: xlogreader.c:938
static bool XLogReaderHasQueuedRecordOrError(XLogReaderState *state)
Definition: xlogreader.h:325

References Assert(), DecodedXLogRecord::header, XLogNextRecord(), XLogReadAhead(), XLogReaderHasQueuedRecordOrError(), and XLogReleasePreviousRecord().

Referenced by DecodingContextFindStartpoint(), extractPageMap(), findLastCheckpoint(), main(), pg_logical_replication_slot_advance(), pg_logical_slot_get_changes_guts(), ReadNextXLogRecord(), readOneRecord(), XLogFindNextRecord(), XlogReadTwoPhaseData(), and XLogSendLogical().

◆ XLogReadRecordAlloc()

static DecodedXLogRecord* XLogReadRecordAlloc ( XLogReaderState state,
size_t  xl_tot_len,
bool  allow_oversized 
)
static

Definition at line 467 of file xlogreader.c.

468 {
469  size_t required_space = DecodeXLogRecordRequiredSpace(xl_tot_len);
470  DecodedXLogRecord *decoded = NULL;
471 
472  /* Allocate a circular decode buffer if we don't have one already. */
473  if (unlikely(state->decode_buffer == NULL))
474  {
475  if (state->decode_buffer_size == 0)
476  state->decode_buffer_size = DEFAULT_DECODE_BUFFER_SIZE;
477  state->decode_buffer = palloc(state->decode_buffer_size);
478  state->decode_buffer_head = state->decode_buffer;
479  state->decode_buffer_tail = state->decode_buffer;
480  state->free_decode_buffer = true;
481  }
482 
483  /* Try to allocate space in the circular decode buffer. */
484  if (state->decode_buffer_tail >= state->decode_buffer_head)
485  {
486  /* Empty, or tail is to the right of head. */
487  if (state->decode_buffer_tail + required_space <=
488  state->decode_buffer + state->decode_buffer_size)
489  {
490  /* There is space between tail and end. */
491  decoded = (DecodedXLogRecord *) state->decode_buffer_tail;
492  decoded->oversized = false;
493  return decoded;
494  }
495  else if (state->decode_buffer + required_space <
496  state->decode_buffer_head)
497  {
498  /* There is space between start and head. */
499  decoded = (DecodedXLogRecord *) state->decode_buffer;
500  decoded->oversized = false;
501  return decoded;
502  }
503  }
504  else
505  {
506  /* Tail is to the left of head. */
507  if (state->decode_buffer_tail + required_space <
508  state->decode_buffer_head)
509  {
510  /* There is space between tail and head. */
511  decoded = (DecodedXLogRecord *) state->decode_buffer_tail;
512  decoded->oversized = false;
513  return decoded;
514  }
515  }
516 
517  /* Not enough space in the decode buffer. Are we allowed to allocate? */
518  if (allow_oversized)
519  {
520  decoded = palloc_extended(required_space, MCXT_ALLOC_NO_OOM);
521  if (decoded == NULL)
522  return NULL;
523  decoded->oversized = true;
524  return decoded;
525  }
526 
527  return NULL;
528 }
#define unlikely(x)
Definition: c.h:273
void * palloc(Size size)
Definition: mcxt.c:1068
#define DEFAULT_DECODE_BUFFER_SIZE
Definition: xlogreader.c:66

References DecodeXLogRecordRequiredSpace(), DEFAULT_DECODE_BUFFER_SIZE, MCXT_ALLOC_NO_OOM, DecodedXLogRecord::oversized, palloc(), palloc_extended(), and unlikely.

Referenced by XLogDecodeNextRecord().

◆ XLogRecGetBlockData()

char* XLogRecGetBlockData ( XLogReaderState record,
uint8  block_id,
Size len 
)

◆ XLogRecGetBlockTag()

void XLogRecGetBlockTag ( XLogReaderState record,
uint8  block_id,
RelFileNode rnode,
ForkNumber forknum,
BlockNumber blknum 
)

Definition at line 1928 of file xlogreader.c.

1930 {
1931  if (!XLogRecGetBlockTagExtended(record, block_id, rnode, forknum, blknum,
1932  NULL))
1933  {
1934 #ifndef FRONTEND
1935  elog(ERROR, "failed to locate backup block with ID %d in WAL record",
1936  block_id);
1937 #else
1938  pg_fatal("failed to locate backup block with ID %d in WAL record",
1939  block_id);
1940 #endif
1941  }
1942 }
#define ERROR
Definition: elog.h:33
#define elog(elevel,...)
Definition: elog.h:218
#define pg_fatal(...)
bool XLogRecGetBlockTagExtended(XLogReaderState *record, uint8 block_id, RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum, Buffer *prefetch_buffer)
Definition: xlogreader.c:1953

References elog, ERROR, pg_fatal, and XLogRecGetBlockTagExtended().

Referenced by brin_xlog_revmap_extend(), btree_xlog_delete(), btree_xlog_split(), DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeUpdate(), gistRedoDeleteRecord(), gistRedoPageSplitRecord(), hash_xlog_add_ovfl_page(), hash_xlog_init_bitmap_page(), hash_xlog_init_meta_page(), hash_xlog_vacuum_one_page(), heap_xlog_delete(), heap_xlog_freeze_page(), heap_xlog_insert(), heap_xlog_lock(), heap_xlog_lock_updated(), heap_xlog_multi_insert(), heap_xlog_prune(), heap_xlog_update(), heap_xlog_vacuum(), heap_xlog_visible(), spgRedoAddLeaf(), spgRedoAddNode(), spgRedoMoveLeafs(), spgRedoPickSplit(), and spgRedoVacuumRedirect().

◆ XLogRecGetBlockTagExtended()

bool XLogRecGetBlockTagExtended ( XLogReaderState record,
uint8  block_id,
RelFileNode rnode,
ForkNumber forknum,
BlockNumber blknum,
Buffer prefetch_buffer 
)

Definition at line 1953 of file xlogreader.c.

1957 {
1958  DecodedBkpBlock *bkpb;
1959 
1960  if (!XLogRecHasBlockRef(record, block_id))
1961  return false;
1962 
1963  bkpb = &record->record->blocks[block_id];
1964  if (rnode)
1965  *rnode = bkpb->rnode;
1966  if (forknum)
1967  *forknum = bkpb->forknum;
1968  if (blknum)
1969  *blknum = bkpb->blkno;
1970  if (prefetch_buffer)
1971  *prefetch_buffer = bkpb->prefetch_buffer;
1972  return true;
1973 }
#define XLogRecHasBlockRef(decoder, block_id)
Definition: xlogreader.h:418

References DecodedBkpBlock::blkno, DecodedXLogRecord::blocks, DecodedBkpBlock::forknum, DecodedBkpBlock::prefetch_buffer, XLogReaderState::record, DecodedBkpBlock::rnode, and XLogRecHasBlockRef.

Referenced by btree_xlog_split(), extractPageInfo(), heap_xlog_update(), verifyBackupPageConsistency(), xlog_block_info(), XLogReadBufferForRedoExtended(), XLogRecGetBlockRefInfo(), XLogRecGetBlockTag(), and XLogRecordMatchesRelationBlock().

◆ XLogRecGetFullXid()

FullTransactionId XLogRecGetFullXid ( XLogReaderState record)

Definition at line 2111 of file xlogreader.c.

2112 {
2113  TransactionId xid,
2114  next_xid;
2115  uint32 epoch;
2116 
2117  /*
2118  * This function is only safe during replay, because it depends on the
2119  * replay state. See AdvanceNextFullTransactionIdPastXid() for more.
2120  */
2122 
2123  xid = XLogRecGetXid(record);
2126 
2127  /*
2128  * If xid is numerically greater than next_xid, it has to be from the last
2129  * epoch.
2130  */
2131  if (unlikely(xid > next_xid))
2132  --epoch;
2133 
2135 }
static const unsigned __int64 epoch
Definition: gettimeofday.c:34
bool IsUnderPostmaster
Definition: globals.c:113
#define AmStartupProcess()
Definition: miscadmin.h:444
FullTransactionId nextXid
Definition: transam.h:220
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
static FullTransactionId FullTransactionIdFromEpochAndXid(uint32 epoch, TransactionId xid)
Definition: transam.h:71
VariableCache ShmemVariableCache
Definition: varsup.c:34
#define XLogRecGetXid(decoder)
Definition: xlogreader.h:410

References AmStartupProcess, Assert(), epoch, EpochFromFullTransactionId, FullTransactionIdFromEpochAndXid(), IsUnderPostmaster, VariableCacheData::nextXid, ShmemVariableCache, unlikely, XidFromFullTransactionId, and XLogRecGetXid.

◆ XLogReleasePreviousRecord()

void XLogReleasePreviousRecord ( XLogReaderState state)

Definition at line 282 of file xlogreader.c.

283 {
284  DecodedXLogRecord *record;
285 
286  if (!state->record)
287  return;
288 
289  /*
290  * Remove it from the decoded record queue. It must be the oldest item
291  * decoded, decode_queue_head.
292  */
293  record = state->record;
294  Assert(record == state->decode_queue_head);
295  state->record = NULL;
296  state->decode_queue_head = record->next;
297 
298  /* It might also be the newest item decoded, decode_queue_tail. */
299  if (state->decode_queue_tail == record)
300  state->decode_queue_tail = NULL;
301 
302  /* Release the space. */
303  if (unlikely(record->oversized))
304  {
305  /* It's not in the decode buffer, so free it to release space. */
306  pfree(record);
307  }
308  else
309  {
310  /* It must be the head (oldest) record in the decode buffer. */
311  Assert(state->decode_buffer_head == (char *) record);
312 
313  /*
314  * We need to update head to point to the next record that is in the
315  * decode buffer, if any, being careful to skip oversized ones
316  * (they're not in the decode buffer).
317  */
318  record = record->next;
319  while (unlikely(record && record->oversized))
320  record = record->next;
321 
322  if (record)
323  {
324  /* Adjust head to release space up to the next record. */
325  state->decode_buffer_head = (char *) record;
326  }
327  else
328  {
329  /*
330  * Otherwise we might as well just reset head and tail to the
331  * start of the buffer space, because we're empty. This means
332  * we'll keep overwriting the same piece of memory if we're not
333  * doing any prefetching.
334  */
335  state->decode_buffer_head = state->decode_buffer;
336  state->decode_buffer_tail = state->decode_buffer;
337  }
338  }
339 }

References Assert(), DecodedXLogRecord::next, DecodedXLogRecord::oversized, pfree(), and unlikely.

Referenced by XLogNextRecord(), XLogPrefetcherReadRecord(), and XLogReadRecord().