PostgreSQL Source Code git master
Loading...
Searching...
No Matches
pg_waldump.h File Reference
Include dependency graph for pg_waldump.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  XLogDumpPrivate
 

Typedefs

typedef struct XLogDumpPrivate XLogDumpPrivate
 

Functions

int open_file_in_directory (const char *directory, const char *fname)
 
void init_archive_reader (XLogDumpPrivate *privateInfo, pg_compress_algorithm compression)
 
void free_archive_reader (XLogDumpPrivate *privateInfo)
 
int read_archive_wal_page (XLogDumpPrivate *privateInfo, XLogRecPtr targetPagePtr, Size count, char *readBuff)
 
void free_archive_wal_entry (const char *fname, XLogDumpPrivate *privateInfo)
 

Variables

charTmpWalSegDir
 

Typedef Documentation

◆ XLogDumpPrivate

Function Documentation

◆ free_archive_reader()

void free_archive_reader ( XLogDumpPrivate privateInfo)
extern

Definition at line 234 of file archive_waldump.c.

235{
236 /*
237 * NB: Normally, astreamer_finalize() is called before astreamer_free() to
238 * flush any remaining buffered data or to ensure the end of the tar
239 * archive is reached. However, when decoding WAL, once we hit the end
240 * LSN, any remaining buffered data or unread portion of the archive can
241 * be safely ignored.
242 */
243 astreamer_free(privateInfo->archive_streamer);
244
245 /* Free any remaining hash table entries and their buffers. */
246 if (privateInfo->archive_wal_htab != NULL)
247 {
249 ArchivedWALFile *entry;
250
251 ArchivedWAL_start_iterate(privateInfo->archive_wal_htab, &iter);
252 while ((entry = ArchivedWAL_iterate(privateInfo->archive_wal_htab,
253 &iter)) != NULL)
254 {
255 if (entry->buf != NULL)
256 destroyStringInfo(entry->buf);
257 }
259 privateInfo->archive_wal_htab = NULL;
260 }
261
262 /* Free the reusable read buffer. */
263 if (privateInfo->archive_read_buf != NULL)
264 {
265 pg_free(privateInfo->archive_read_buf);
266 privateInfo->archive_read_buf = NULL;
267 }
268
269 /* Close the file. */
270 if (close(privateInfo->archive_fd) != 0)
271 pg_log_error("could not close file \"%s\": %m",
272 privateInfo->archive_name);
273}
static void astreamer_free(astreamer *streamer)
Definition astreamer.h:153
void pg_free(void *ptr)
#define close(a)
Definition win32.h:12
#define pg_log_error(...)
Definition logging.h:106
static int fb(int x)
void destroyStringInfo(StringInfo str)
Definition stringinfo.c:409
struct ArchivedWAL_hash * archive_wal_htab
Definition pg_waldump.h:54
astreamer * archive_streamer
Definition pg_waldump.h:39
char * archive_read_buf
Definition pg_waldump.h:40
char * archive_name
Definition pg_waldump.h:36

References XLogDumpPrivate::archive_fd, XLogDumpPrivate::archive_name, XLogDumpPrivate::archive_read_buf, XLogDumpPrivate::archive_streamer, XLogDumpPrivate::archive_wal_htab, astreamer_free(), ArchivedWALFile::buf, close, destroyStringInfo(), fb(), pg_free(), and pg_log_error.

Referenced by main().

◆ free_archive_wal_entry()

void free_archive_wal_entry ( const char fname,
XLogDumpPrivate privateInfo 
)
extern

Definition at line 415 of file archive_waldump.c.

416{
417 ArchivedWALFile *entry;
418
419 entry = ArchivedWAL_lookup(privateInfo->archive_wal_htab, fname);
420
421 if (entry == NULL)
422 return;
423
424 /* Destroy the buffer */
425 destroyStringInfo(entry->buf);
426 entry->buf = NULL;
427
428 /* Remove temporary file if any */
429 if (entry->spilled)
430 {
431 char fpath[MAXPGPATH];
432
433 snprintf(fpath, MAXPGPATH, "%s/%s", TmpWalSegDir, fname);
434
435 if (unlink(fpath) == 0)
436 pg_log_debug("removed file \"%s\"", fpath);
437 }
438
439 /* Clear cur_file if it points to the entry being freed */
440 if (privateInfo->cur_file == entry)
441 privateInfo->cur_file = NULL;
442
443 ArchivedWAL_delete_item(privateInfo->archive_wal_htab, entry);
444}
char * TmpWalSegDir
#define pg_log_debug(...)
Definition logging.h:133
#define MAXPGPATH
#define snprintf
Definition port.h:260
struct ArchivedWALFile * cur_file
Definition pg_waldump.h:47

References XLogDumpPrivate::archive_wal_htab, ArchivedWALFile::buf, XLogDumpPrivate::cur_file, destroyStringInfo(), fb(), MAXPGPATH, pg_log_debug, snprintf, ArchivedWALFile::spilled, and TmpWalSegDir.

Referenced by init_archive_reader(), and TarWALDumpReadPage().

◆ init_archive_reader()

void init_archive_reader ( XLogDumpPrivate privateInfo,
pg_compress_algorithm  compression 
)
extern

Definition at line 124 of file archive_waldump.c.

126{
127 int fd;
128 astreamer *streamer;
129 ArchivedWALFile *entry = NULL;
131 XLogSegNo segno;
132 TimeLineID timeline;
133
134 /* Open tar archive and store its file descriptor */
136 privateInfo->archive_name);
137
138 if (fd < 0)
139 pg_fatal("could not open file \"%s\"", privateInfo->archive_name);
140
141 privateInfo->archive_fd = fd;
142
143 streamer = astreamer_waldump_new(privateInfo);
144
145 /* We must first parse the tar archive. */
146 streamer = astreamer_tar_parser_new(streamer);
147
148 /* If the archive is compressed, decompress before parsing. */
149 if (compression == PG_COMPRESSION_GZIP)
150 streamer = astreamer_gzip_decompressor_new(streamer);
151 else if (compression == PG_COMPRESSION_LZ4)
152 streamer = astreamer_lz4_decompressor_new(streamer);
153 else if (compression == PG_COMPRESSION_ZSTD)
154 streamer = astreamer_zstd_decompressor_new(streamer);
155
156 privateInfo->archive_streamer = streamer;
157
158 /*
159 * Allocate a buffer for reading the archive file to facilitate content
160 * decoding; read requests must not exceed the allocated buffer size.
161 */
163
164#ifdef USE_ASSERT_CHECKING
165 privateInfo->archive_read_buf_size = READ_CHUNK_SIZE;
166#endif
167
168 /*
169 * Hash table storing WAL entries read from the archive with an arbitrary
170 * initial size.
171 */
172 privateInfo->archive_wal_htab = ArchivedWAL_create(8, NULL);
173
174 /*
175 * Read until we have at least one full WAL page (XLOG_BLCKSZ bytes) from
176 * the first WAL segment in the archive so we can extract the WAL segment
177 * size from the long page header.
178 */
179 while (entry == NULL || entry->buf->len < XLOG_BLCKSZ)
180 {
181 if (read_archive_file(privateInfo, XLOG_BLCKSZ) == 0)
182 pg_fatal("could not find WAL in archive \"%s\"",
183 privateInfo->archive_name);
184
185 entry = privateInfo->cur_file;
186 }
187
188 /* Extract the WAL segment size from the long page header */
190
191 if (!IsValidWalSegSize(longhdr->xlp_seg_size))
192 {
193 pg_log_error(ngettext("invalid WAL segment size in WAL file from archive \"%s\" (%d byte)",
194 "invalid WAL segment size in WAL file from archive \"%s\" (%d bytes)",
195 longhdr->xlp_seg_size),
196 privateInfo->archive_name, longhdr->xlp_seg_size);
197 pg_log_error_detail("The WAL segment size must be a power of two between 1 MB and 1 GB.");
198 exit(1);
199 }
200
201 privateInfo->segsize = longhdr->xlp_seg_size;
202
203 /*
204 * With the WAL segment size available, we can now initialize the
205 * dependent start and end segment numbers.
206 */
207 Assert(!XLogRecPtrIsInvalid(privateInfo->startptr));
208 XLByteToSeg(privateInfo->startptr, privateInfo->start_segno,
209 privateInfo->segsize);
210
211 if (!XLogRecPtrIsInvalid(privateInfo->endptr))
212 XLByteToSeg(privateInfo->endptr, privateInfo->end_segno,
213 privateInfo->segsize);
214
215 /*
216 * This WAL record was fetched before the filtering parameters
217 * (start_segno and end_segno) were fully initialized. Perform the
218 * relevance check against the user-provided range now; if the WAL falls
219 * outside this range, remove it from the hash table. Subsequent WAL will
220 * be filtered automatically by the archive streamer using the updated
221 * start_segno and end_segno values.
222 */
223 XLogFromFileName(entry->fname, &timeline, &segno, privateInfo->segsize);
224 if (privateInfo->timeline != timeline ||
225 privateInfo->start_segno > segno ||
226 privateInfo->end_segno < segno)
227 free_archive_wal_entry(entry->fname, privateInfo);
228}
static astreamer * astreamer_waldump_new(XLogDumpPrivate *privateInfo)
#define READ_CHUNK_SIZE
static int read_archive_file(XLogDumpPrivate *privateInfo, Size count)
void free_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo)
astreamer * astreamer_gzip_decompressor_new(astreamer *next)
astreamer * astreamer_lz4_decompressor_new(astreamer *next)
astreamer * astreamer_tar_parser_new(astreamer *next)
astreamer * astreamer_zstd_decompressor_new(astreamer *next)
#define ngettext(s, p, n)
Definition c.h:1272
#define Assert(condition)
Definition c.h:945
@ PG_COMPRESSION_GZIP
Definition compression.h:24
@ PG_COMPRESSION_LZ4
Definition compression.h:25
@ PG_COMPRESSION_ZSTD
Definition compression.h:26
void * pg_malloc(size_t size)
Definition fe_memutils.c:47
#define pg_log_error_detail(...)
Definition logging.h:109
#define pg_fatal(...)
int open_file_in_directory(const char *directory, const char *fname)
Definition pg_waldump.c:180
static int fd(const char *x, int i)
const char * fname
XLogSegNo start_segno
Definition pg_waldump.h:62
XLogRecPtr endptr
Definition pg_waldump.h:30
XLogRecPtr startptr
Definition pg_waldump.h:29
TimeLineID timeline
Definition pg_waldump.h:27
char * archive_dir
Definition pg_waldump.h:35
XLogSegNo end_segno
Definition pg_waldump.h:63
#define IsValidWalSegSize(size)
XLogLongPageHeaderData * XLogLongPageHeader
static void XLogFromFileName(const char *fname, TimeLineID *tli, XLogSegNo *logSegNo, int wal_segsz_bytes)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define XLogRecPtrIsInvalid(r)
Definition xlogdefs.h:30
uint32 TimeLineID
Definition xlogdefs.h:63
uint64 XLogSegNo
Definition xlogdefs.h:52

References XLogDumpPrivate::archive_dir, XLogDumpPrivate::archive_fd, XLogDumpPrivate::archive_name, XLogDumpPrivate::archive_read_buf, XLogDumpPrivate::archive_streamer, XLogDumpPrivate::archive_wal_htab, Assert, astreamer_gzip_decompressor_new(), astreamer_lz4_decompressor_new(), astreamer_tar_parser_new(), astreamer_waldump_new(), astreamer_zstd_decompressor_new(), ArchivedWALFile::buf, XLogDumpPrivate::cur_file, StringInfoData::data, XLogDumpPrivate::end_segno, XLogDumpPrivate::endptr, fb(), fd(), ArchivedWALFile::fname, free_archive_wal_entry(), IsValidWalSegSize, StringInfoData::len, ngettext, open_file_in_directory(), PG_COMPRESSION_GZIP, PG_COMPRESSION_LZ4, PG_COMPRESSION_ZSTD, pg_fatal, pg_log_error, pg_log_error_detail, pg_malloc(), read_archive_file(), READ_CHUNK_SIZE, XLogDumpPrivate::segsize, XLogDumpPrivate::start_segno, XLogDumpPrivate::startptr, XLogDumpPrivate::timeline, XLByteToSeg, XLogFromFileName(), and XLogRecPtrIsInvalid.

Referenced by main().

◆ open_file_in_directory()

int open_file_in_directory ( const char directory,
const char fname 
)
extern

Definition at line 180 of file pg_waldump.c.

181{
182 int fd = -1;
183 char fpath[MAXPGPATH];
184
186
187 snprintf(fpath, MAXPGPATH, "%s/%s", directory, fname);
188 fd = open(fpath, O_RDONLY | PG_BINARY, 0);
189
190 if (fd < 0 && errno != ENOENT)
191 pg_fatal("could not open file \"%s\": %m", fname);
192 return fd;
193}
#define PG_BINARY
Definition c.h:1376
static const char * directory
Definition zic.c:648

References Assert, directory, fb(), fd(), MAXPGPATH, PG_BINARY, pg_fatal, and snprintf.

Referenced by init_archive_reader(), main(), search_directory(), TarWALDumpReadPage(), and WALDumpOpenSegment().

◆ read_archive_wal_page()

int read_archive_wal_page ( XLogDumpPrivate privateInfo,
XLogRecPtr  targetPagePtr,
Size  count,
char readBuff 
)
extern

Definition at line 281 of file archive_waldump.c.

283{
284 char *p = readBuff;
285 Size nbytes = count;
287 int segsize = privateInfo->segsize;
288 XLogSegNo segno;
289 char fname[MAXFNAMELEN];
290 ArchivedWALFile *entry;
291
292 /* Identify the segment and locate its entry in the archive hash */
293 XLByteToSeg(targetPagePtr, segno, segsize);
294 XLogFileName(fname, privateInfo->timeline, segno, segsize);
295 entry = get_archive_wal_entry(fname, privateInfo);
296
297 while (nbytes > 0)
298 {
299 char *buf = entry->buf->data;
300 int bufLen = entry->buf->len;
303
304 /*
305 * Calculate the LSN range currently residing in the buffer.
306 *
307 * read_len tracks total bytes received for this segment (including
308 * already-discarded data), so endPtr is the LSN just past the last
309 * buffered byte, and startPtr is the LSN of the first buffered byte.
310 */
311 XLogSegNoOffsetToRecPtr(segno, entry->read_len, segsize, endPtr);
313
314 /*
315 * Copy the requested WAL record if it exists in the buffer.
316 */
317 if (bufLen > 0 && startPtr <= recptr && recptr < endPtr)
318 {
319 int copyBytes;
320 int offset = recptr - startPtr;
321
322 /*
323 * Given startPtr <= recptr < endPtr and a total buffer size
324 * 'bufLen', the offset (recptr - startPtr) will always be less
325 * than 'bufLen'.
326 */
327 Assert(offset < bufLen);
328
329 copyBytes = Min(nbytes, bufLen - offset);
330 memcpy(p, buf + offset, copyBytes);
331
332 /* Update state for read */
333 recptr += copyBytes;
334 nbytes -= copyBytes;
335 p += copyBytes;
336 }
337 else
338 {
339 /*
340 * Before starting the actual decoding loop, pg_waldump tries to
341 * locate the first valid record from the user-specified start
342 * position, which might not be the start of a WAL record and
343 * could fall in the middle of a record that spans multiple pages.
344 * Consequently, the valid start position the decoder is looking
345 * for could be far away from that initial position.
346 *
347 * This may involve reading across multiple pages, and this
348 * pre-reading fetches data in multiple rounds from the archive
349 * streamer; normally, we would throw away existing buffer
350 * contents to fetch the next set of data, but that existing data
351 * might be needed once the main loop starts. Because previously
352 * read data cannot be re-read by the archive streamer, we delay
353 * resetting the buffer until the main decoding loop is entered.
354 *
355 * Once pg_waldump has entered the main loop, it may re-read the
356 * currently active page, but never an older one; therefore, any
357 * fully consumed WAL data preceding the current page can then be
358 * safely discarded.
359 */
360 if (privateInfo->decoding_started)
361 {
362 resetStringInfo(entry->buf);
363
364 /*
365 * Push back the partial page data for the current page to the
366 * buffer, ensuring a full page remains available for
367 * re-reading if requested.
368 */
369 if (p > readBuff)
370 {
371 Assert((count - nbytes) > 0);
372 appendBinaryStringInfo(entry->buf, readBuff, count - nbytes);
373 }
374 }
375
376 /*
377 * Now, fetch more data. Raise an error if the archive streamer
378 * has moved past our segment (meaning the WAL file in the archive
379 * is shorter than expected) or if reading the archive reached
380 * EOF.
381 */
382 if (privateInfo->cur_file != entry)
383 pg_fatal("WAL segment \"%s\" in archive \"%s\" is too short: read %lld of %lld bytes",
384 fname, privateInfo->archive_name,
385 (long long int) (count - nbytes),
386 (long long int) count);
387 if (read_archive_file(privateInfo, READ_CHUNK_SIZE) == 0)
388 pg_fatal("unexpected end of archive \"%s\" while reading \"%s\": read %lld of %lld bytes",
389 privateInfo->archive_name, fname,
390 (long long int) (count - nbytes),
391 (long long int) count);
392 }
393 }
394
395 /*
396 * Should have successfully read all the requested bytes or reported a
397 * failure before this point.
398 */
399 Assert(nbytes == 0);
400
401 /*
402 * Return count unchanged; the caller expects this convention, matching
403 * the routine that reads WAL pages from physical files.
404 */
405 return count;
406}
static ArchivedWALFile * get_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo)
#define Min(x, y)
Definition c.h:1093
size_t Size
Definition c.h:691
static char buf[DEFAULT_XLOG_SEG_SIZE]
void resetStringInfo(StringInfo str)
Definition stringinfo.c:126
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
Definition stringinfo.c:281
bool decoding_started
Definition pg_waldump.h:32
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define MAXFNAMELEN
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
uint64 XLogRecPtr
Definition xlogdefs.h:21

References appendBinaryStringInfo(), XLogDumpPrivate::archive_name, Assert, buf, ArchivedWALFile::buf, XLogDumpPrivate::cur_file, StringInfoData::data, XLogDumpPrivate::decoding_started, fb(), get_archive_wal_entry(), StringInfoData::len, MAXFNAMELEN, Min, pg_fatal, read_archive_file(), READ_CHUNK_SIZE, ArchivedWALFile::read_len, resetStringInfo(), XLogDumpPrivate::segsize, XLogDumpPrivate::timeline, XLByteToSeg, XLogFileName(), and XLogSegNoOffsetToRecPtr.

Referenced by TarWALDumpReadPage().

Variable Documentation

◆ TmpWalSegDir