PostgreSQL Source Code git master
Loading...
Searching...
No Matches
archive_waldump.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * archive_waldump.c
4 * A generic facility for reading WAL data from tar archives via archive
5 * streamer.
6 *
7 * Portions Copyright (c) 2026, PostgreSQL Global Development Group
8 *
9 * IDENTIFICATION
10 * src/bin/pg_waldump/archive_waldump.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres_fe.h"
16
17#include <unistd.h>
18
20#include "common/file_perm.h"
21#include "common/hashfn.h"
22#include "common/logging.h"
24#include "pg_waldump.h"
25
26/*
27 * How many bytes should we try to read from a file at once?
28 */
29#define READ_CHUNK_SIZE (128 * 1024)
30
31/* Temporary directory for spilled WAL segment files */
33
34/*
35 * Check if the start segment number is zero; this indicates a request to read
36 * any WAL file.
37 */
38#define READ_ANY_WAL(privateInfo) ((privateInfo)->start_segno == 0)
39
40/*
41 * Hash entry representing a WAL segment retrieved from the archive.
42 *
43 * While WAL segments are typically read sequentially, individual entries
44 * maintain their own buffers for the following reasons:
45 *
46 * 1. Boundary Handling: The archive streamer provides a continuous byte
47 * stream. A single streaming chunk may contain the end of one WAL segment
48 * and the start of the next. Separate buffers allow us to easily
49 * partition and track these bytes by their respective segments.
50 *
51 * 2. Out-of-Order Support: Dedicated buffers simplify logic when segments
52 * are archived or retrieved out of sequence.
53 *
54 * To minimize the memory footprint, entries and their associated buffers are
55 * freed once consumed. Since pg_waldump does not request the same bytes
56 * twice (after it's located the point at which it should start decoding),
57 * a segment can be discarded as soon as pg_waldump moves past it. Moreover,
58 * if we read a segment that won't be needed till later, we spill its data to
59 * a temporary file instead of retaining it in memory. This ensures that
60 * pg_waldump can process even very large tar archives without needing more
61 * than a few WAL segments' worth of memory space.
62 */
63typedef struct ArchivedWALFile
64{
65 uint32 status; /* hash status */
66 const char *fname; /* hash key: WAL segment name */
67
68 StringInfo buf; /* holds WAL bytes read from archive */
69 bool spilled; /* true if the WAL data was spilled to a
70 * temporary file */
71
72 int read_len; /* total bytes received from archive for this
73 * segment (same as buf->len, unless we have
74 * spilled the data to a temp file) */
76
77static uint32 hash_string_pointer(const char *s);
78#define SH_PREFIX ArchivedWAL
79#define SH_ELEMENT_TYPE ArchivedWALFile
80#define SH_KEY_TYPE const char *
81#define SH_KEY fname
82#define SH_HASH_KEY(tb, key) hash_string_pointer(key)
83#define SH_EQUAL(tb, a, b) (strcmp(a, b) == 0)
84#define SH_SCOPE static inline
85#define SH_RAW_ALLOCATOR pg_malloc0
86#define SH_DECLARE
87#define SH_DEFINE
88#include "lib/simplehash.h"
89
95
96static ArchivedWALFile *get_archive_wal_entry(const char *fname,
97 XLogDumpPrivate *privateInfo);
98static bool read_archive_file(XLogDumpPrivate *privateInfo);
99static void setup_tmpwal_dir(const char *waldir);
100
101static FILE *prepare_tmp_write(const char *fname, XLogDumpPrivate *privateInfo);
102static void perform_tmp_write(const char *fname, StringInfo buf, FILE *file);
103
105static void astreamer_waldump_content(astreamer *streamer,
106 astreamer_member *member,
107 const char *data, int len,
109static void astreamer_waldump_finalize(astreamer *streamer);
110static void astreamer_waldump_free(astreamer *streamer);
111
113 astreamer_member *member,
114 char **fname);
115
121
122/*
123 * Initializes the tar archive reader: opens the archive, builds a hash table
124 * for WAL entries, reads ahead until a full WAL page header is available to
125 * determine the WAL segment size, and computes start/end segment numbers for
126 * filtering.
127 */
128void
130 pg_compress_algorithm compression)
131{
132 int fd;
133 astreamer *streamer;
134 ArchivedWALFile *entry = NULL;
137
138 /* Open tar archive and store its file descriptor */
140 privateInfo->archive_name);
141
142 if (fd < 0)
143 pg_fatal("could not open file \"%s\"", privateInfo->archive_name);
144
145 privateInfo->archive_fd = fd;
146 privateInfo->archive_fd_eof = false;
147
148 streamer = astreamer_waldump_new(privateInfo);
149
150 /* We must first parse the tar archive. */
151 streamer = astreamer_tar_parser_new(streamer);
152
153 /* If the archive is compressed, decompress before parsing. */
154 if (compression == PG_COMPRESSION_GZIP)
155 streamer = astreamer_gzip_decompressor_new(streamer);
156 else if (compression == PG_COMPRESSION_LZ4)
157 streamer = astreamer_lz4_decompressor_new(streamer);
158 else if (compression == PG_COMPRESSION_ZSTD)
159 streamer = astreamer_zstd_decompressor_new(streamer);
160
161 privateInfo->archive_streamer = streamer;
162
163 /*
164 * Allocate a buffer for reading the archive file to begin content
165 * decoding.
166 */
169
170 /*
171 * Hash table storing WAL entries read from the archive with an arbitrary
172 * initial size.
173 */
174 privateInfo->archive_wal_htab = ArchivedWAL_create(8, NULL);
175
176 /*
177 * Read until we have at least one WAL segment with enough data to extract
178 * the WAL segment size from the long page header.
179 *
180 * We must not rely on cur_file here, because it can become NULL if a
181 * member trailer is processed during a read_archive_file() call. Instead,
182 * scan the hash table after each read to find any entry with sufficient
183 * data.
184 */
185 while (entry == NULL)
186 {
187 if (!read_archive_file(privateInfo))
188 pg_fatal("could not find WAL in archive \"%s\"",
189 privateInfo->archive_name);
190
191 ArchivedWAL_start_iterate(privateInfo->archive_wal_htab, &iter);
192 while ((entry = ArchivedWAL_iterate(privateInfo->archive_wal_htab,
193 &iter)) != NULL)
194 {
195 if (entry->read_len >= sizeof(XLogLongPageHeaderData))
196 break;
197 }
198 }
199
200 /* Extract the WAL segment size from the long page header */
202
203 if (!IsValidWalSegSize(longhdr->xlp_seg_size))
204 {
205 pg_log_error(ngettext("invalid WAL segment size in WAL file from archive \"%s\" (%d byte)",
206 "invalid WAL segment size in WAL file from archive \"%s\" (%d bytes)",
207 longhdr->xlp_seg_size),
208 privateInfo->archive_name, longhdr->xlp_seg_size);
209 pg_log_error_detail("The WAL segment size must be a power of two between 1 MB and 1 GB.");
210 exit(1);
211 }
212
213 privateInfo->segsize = longhdr->xlp_seg_size;
214
215 /*
216 * With the WAL segment size available, we can now initialize the
217 * dependent start and end segment numbers.
218 */
219 Assert(!XLogRecPtrIsInvalid(privateInfo->startptr));
220 XLByteToSeg(privateInfo->startptr, privateInfo->start_segno,
221 privateInfo->segsize);
222
223 if (!XLogRecPtrIsInvalid(privateInfo->endptr))
224 XLByteToSeg(privateInfo->endptr, privateInfo->end_segno,
225 privateInfo->segsize);
226
227 /*
228 * Now that we have initialized the filtering parameters (start_segno and
229 * end_segno), we can discard any already-loaded WAL hash table entries
230 * for segments we don't actually need. Subsequent WAL will be filtered
231 * automatically by the archive streamer using the updated start_segno and
232 * end_segno values.
233 */
234 ArchivedWAL_start_iterate(privateInfo->archive_wal_htab, &iter);
235 while ((entry = ArchivedWAL_iterate(privateInfo->archive_wal_htab,
236 &iter)) != NULL)
237 {
238 XLogSegNo segno;
239 TimeLineID timeline;
240
241 XLogFromFileName(entry->fname, &timeline, &segno, privateInfo->segsize);
242 if (privateInfo->timeline != timeline ||
243 privateInfo->start_segno > segno ||
244 privateInfo->end_segno < segno)
245 free_archive_wal_entry(entry->fname, privateInfo);
246 }
247}
248
249/*
250 * Release the archive streamer chain and close the archive file.
251 */
252void
254{
255 /*
256 * NB: Normally, astreamer_finalize() is called before astreamer_free() to
257 * flush any remaining buffered data or to ensure the end of the tar
258 * archive is reached. read_archive_file() may have done so. However,
259 * when decoding WAL we can stop once we hit the end LSN, so we may never
260 * have read all of the input file. In that case any remaining buffered
261 * data or unread portion of the archive can be safely ignored.
262 */
263 astreamer_free(privateInfo->archive_streamer);
264
265 /* Free any remaining hash table entries and their buffers. */
266 if (privateInfo->archive_wal_htab != NULL)
267 {
269 ArchivedWALFile *entry;
270
271 ArchivedWAL_start_iterate(privateInfo->archive_wal_htab, &iter);
272 while ((entry = ArchivedWAL_iterate(privateInfo->archive_wal_htab,
273 &iter)) != NULL)
274 {
275 if (entry->buf != NULL)
276 destroyStringInfo(entry->buf);
277 }
279 privateInfo->archive_wal_htab = NULL;
280 }
281
282 /* Free the reusable read buffer. */
283 if (privateInfo->archive_read_buf != NULL)
284 {
285 pg_free(privateInfo->archive_read_buf);
286 privateInfo->archive_read_buf = NULL;
287 }
288
289 /* Close the file. */
290 if (close(privateInfo->archive_fd) != 0)
291 pg_log_error("could not close file \"%s\": %m",
292 privateInfo->archive_name);
293}
294
295/*
296 * Copies the requested WAL data from the hash entry's buffer into readBuff.
297 * If the buffer does not yet contain the needed bytes, fetches more data from
298 * the tar archive via the archive streamer.
299 */
300int
302 size_t count, char *readBuff)
303{
304 char *p = readBuff;
305 size_t nbytes = count;
307 int segsize = privateInfo->segsize;
308 XLogSegNo segno;
309 char fname[MAXFNAMELEN];
310 ArchivedWALFile *entry;
311
312 /* Identify the segment and locate its entry in the archive hash */
313 XLByteToSeg(targetPagePtr, segno, segsize);
314 XLogFileName(fname, privateInfo->timeline, segno, segsize);
315 entry = get_archive_wal_entry(fname, privateInfo);
316 Assert(!entry->spilled);
317
318 while (nbytes > 0)
319 {
320 char *buf = entry->buf->data;
321 int bufLen = entry->buf->len;
324
325 /*
326 * Calculate the LSN range currently residing in the buffer.
327 *
328 * read_len tracks total bytes received for this segment, so endPtr is
329 * the LSN just past the last buffered byte, and startPtr is the LSN
330 * of the first buffered byte.
331 */
332 XLogSegNoOffsetToRecPtr(segno, entry->read_len, segsize, endPtr);
334
335 /*
336 * Copy the requested WAL record if it exists in the buffer.
337 */
338 if (bufLen > 0 && startPtr <= recptr && recptr < endPtr)
339 {
340 int copyBytes;
341 int offset = recptr - startPtr;
342
343 /*
344 * Given startPtr <= recptr < endPtr and a total buffer size
345 * 'bufLen', the offset (recptr - startPtr) will always be less
346 * than 'bufLen'.
347 */
348 Assert(offset < bufLen);
349
350 copyBytes = Min(nbytes, bufLen - offset);
351 memcpy(p, buf + offset, copyBytes);
352
353 /* Update state for read */
354 recptr += copyBytes;
355 nbytes -= copyBytes;
356 p += copyBytes;
357 }
358 else
359 {
360 /*
361 * We evidently need to fetch more data. Raise an error if the
362 * archive streamer has moved past our segment (meaning the WAL
363 * file in the archive is shorter than expected) or if reading the
364 * archive reached EOF.
365 */
366 if (privateInfo->cur_file != entry)
367 pg_fatal("WAL segment \"%s\" in archive \"%s\" is too short: read %zu of %zu bytes",
368 fname, privateInfo->archive_name,
369 (count - nbytes), count);
370 if (!read_archive_file(privateInfo))
371 pg_fatal("unexpected end of archive \"%s\" while reading \"%s\": read %zu of %zu bytes",
372 privateInfo->archive_name, fname,
373 (count - nbytes), count);
374
375 /*
376 * Loading more data may have moved hash table entries, so we must
377 * re-look-up the one we are reading from.
378 */
379 entry = ArchivedWAL_lookup(privateInfo->archive_wal_htab, fname);
380 /* ... it had better still be there */
381 Assert(entry != NULL);
382 }
383 }
384
385 /*
386 * Should have successfully read all the requested bytes or reported a
387 * failure before this point.
388 */
389 Assert(nbytes == 0);
390
391 /*
392 * Return count unchanged; the caller expects this convention, matching
393 * the routine that reads WAL pages from physical files.
394 */
395 return count;
396}
397
398/*
399 * Releases the buffer of a WAL entry that is no longer needed, preventing the
400 * accumulation of irrelevant WAL data. Also removes any associated temporary
401 * file and clears privateInfo->cur_file if it points to this entry, so the
402 * archive streamer skips subsequent data for it.
403 */
404void
405free_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo)
406{
407 ArchivedWALFile *entry;
408 const char *oldfname;
409
410 entry = ArchivedWAL_lookup(privateInfo->archive_wal_htab, fname);
411
412 if (entry == NULL)
413 return;
414
415 /* Destroy the buffer */
416 destroyStringInfo(entry->buf);
417 entry->buf = NULL;
418
419 /* Remove temporary file if any */
420 if (entry->spilled)
421 {
422 char fpath[MAXPGPATH];
423
424 snprintf(fpath, MAXPGPATH, "%s/%s", TmpWalSegDir, fname);
425
426 if (unlink(fpath) == 0)
427 pg_log_debug("removed file \"%s\"", fpath);
428 }
429
430 /* Clear cur_file if it points to the entry being freed */
431 if (privateInfo->cur_file == entry)
432 privateInfo->cur_file = NULL;
433
434 /*
435 * ArchivedWAL_delete_item may cause other hash table entries to move.
436 * Therefore, if cur_file isn't NULL now, we have to be prepared to look
437 * that entry up again after the deletion. Fortunately, the entry's fname
438 * string won't move.
439 */
440 oldfname = privateInfo->cur_file ? privateInfo->cur_file->fname : NULL;
441
442 ArchivedWAL_delete_item(privateInfo->archive_wal_htab, entry);
443
444 if (oldfname)
445 {
446 privateInfo->cur_file = ArchivedWAL_lookup(privateInfo->archive_wal_htab,
447 oldfname);
448 /* ... it had better still be there */
449 Assert(privateInfo->cur_file != NULL);
450 }
451}
452
453/*
454 * Returns the archived WAL entry from the hash table if it already exists.
455 * Otherwise, reads more data from the archive until the requested entry is
456 * found. If the archive streamer reads a WAL file from the archive that
457 * is not currently needed, that data is spilled to a temporary file for later
458 * retrieval.
459 *
460 * Note that the returned entry might not have been completely read from
461 * the archive yet.
462 */
463static ArchivedWALFile *
464get_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo)
465{
466 while (1)
467 {
468 ArchivedWALFile *entry;
470
471 /*
472 * Search the hash table first. If the entry is found, return it.
473 * Otherwise, the requested WAL entry hasn't been read from the
474 * archive yet; we must invoke the archive streamer to fetch it.
475 */
476 entry = ArchivedWAL_lookup(privateInfo->archive_wal_htab, fname);
477
478 if (entry != NULL)
479 return entry;
480
481 /*
482 * Before loading more data, scan the hash table to see if we have
483 * loaded any files we don't need yet. If so, spill their data to
484 * disk to conserve memory space. But don't try to spill a
485 * partially-read file; it's not worth the complication.
486 */
487 ArchivedWAL_start_iterate(privateInfo->archive_wal_htab, &iter);
488 while ((entry = ArchivedWAL_iterate(privateInfo->archive_wal_htab,
489 &iter)) != NULL)
490 {
491 FILE *write_fp;
492
493 /* OK to spill? */
494 if (entry->spilled)
495 continue; /* already spilled */
496 if (entry == privateInfo->cur_file)
497 continue; /* still being read */
498
499 /* Write out the completed WAL file contents to a temp file. */
500 write_fp = prepare_tmp_write(entry->fname, privateInfo);
501 perform_tmp_write(entry->fname, entry->buf, write_fp);
502 if (fclose(write_fp) != 0)
503 pg_fatal("could not close file \"%s/%s\": %m",
504 TmpWalSegDir, entry->fname);
505
506 /* resetStringInfo won't release storage, so delete/recreate. */
507 destroyStringInfo(entry->buf);
508 entry->buf = makeStringInfo();
509 entry->spilled = true;
510 }
511
512 /*
513 * Read more data. If we reach EOF, the desired file is not present.
514 */
515 if (!read_archive_file(privateInfo))
516 pg_fatal("could not find WAL \"%s\" in archive \"%s\"",
517 fname, privateInfo->archive_name);
518 }
519}
520
521/*
522 * Reads a chunk from the archive file and passes it through the streamer
523 * pipeline for decompression (if needed) and tar member extraction.
524 *
525 * Returns true if successful, false if there is no more data.
526 *
527 * Callers must be aware that a single call may trigger multiple callbacks
528 * in astreamer_waldump_content, so privateInfo->cur_file can change value
529 * (or become NULL) during a call. In particular, cur_file is set to NULL
530 * when the ASTREAMER_MEMBER_TRAILER callback fires at the end of a tar
531 * member; it is then set to a new entry when the next WAL member's
532 * ASTREAMER_MEMBER_HEADER callback fires, which may or may not happen
533 * within the same call.
534 */
535static bool
537{
538 int rc;
539
540 /* Fail if we already reached EOF in a prior call. */
541 if (privateInfo->archive_fd_eof)
542 return false;
543
544 /* Try to read some more data. */
545 rc = read(privateInfo->archive_fd, privateInfo->archive_read_buf,
546 privateInfo->archive_read_buf_size);
547 if (rc < 0)
548 pg_fatal("could not read file \"%s\": %m",
549 privateInfo->archive_name);
550
551 /*
552 * Decompress (if required), and then parse the previously read contents
553 * of the tar file.
554 */
555 if (rc > 0)
557 privateInfo->archive_read_buf, rc,
559 else
560 {
561 /*
562 * We reached EOF, but there is probably still data queued in the
563 * astreamer pipeline's buffers. Flush it out to ensure that we
564 * process everything.
565 */
567 /* Set flag to ensure we don't finalize more than once. */
568 privateInfo->archive_fd_eof = true;
569 }
570
571 return true;
572}
573
574/*
575 * Set up a temporary directory to temporarily store WAL segments.
576 */
577static void
579{
580 const char *tmpdir = getenv("TMPDIR");
581 char *template;
582
584
585 /*
586 * Use the directory specified by the TMPDIR environment variable. If it's
587 * not set, fall back to the provided WAL directory to store WAL files
588 * temporarily.
589 */
590 template = psprintf("%s/waldump_tmp-XXXXXX",
591 tmpdir ? tmpdir : waldir);
592 TmpWalSegDir = mkdtemp(template);
593
594 if (TmpWalSegDir == NULL)
595 pg_fatal("could not create directory \"%s\": %m", template);
596
598
599 pg_log_debug("created directory \"%s\"", TmpWalSegDir);
600}
601
602/*
603 * Open a file in the temporary spill directory for writing an out-of-order
604 * WAL segment, creating the directory if not already done.
605 * Returns the open file handle.
606 */
607static FILE *
608prepare_tmp_write(const char *fname, XLogDumpPrivate *privateInfo)
609{
610 char fpath[MAXPGPATH];
611 FILE *file;
612
613 /* Setup temporary directory to store WAL segments, if we didn't already */
614 if (unlikely(TmpWalSegDir == NULL))
615 setup_tmpwal_dir(privateInfo->archive_dir);
616
617 snprintf(fpath, MAXPGPATH, "%s/%s", TmpWalSegDir, fname);
618
619 /* Open the spill file for writing */
620 file = fopen(fpath, PG_BINARY_W);
621 if (file == NULL)
622 pg_fatal("could not create file \"%s\": %m", fpath);
623
624#ifndef WIN32
626 pg_fatal("could not set permissions on file \"%s\": %m",
627 fpath);
628#endif
629
630 pg_log_debug("spilling to temporary file \"%s\"", fpath);
631
632 return file;
633}
634
635/*
636 * Write buffer data to the given file handle.
637 */
638static void
639perform_tmp_write(const char *fname, StringInfo buf, FILE *file)
640{
641 Assert(file);
642
643 errno = 0;
644 if (buf->len > 0 && fwrite(buf->data, buf->len, 1, file) != 1)
645 {
646 /*
647 * If write didn't set errno, assume problem is no disk space
648 */
649 if (errno == 0)
650 errno = ENOSPC;
651 pg_fatal("could not write to file \"%s/%s\": %m", TmpWalSegDir, fname);
652 }
653}
654
655/*
656 * Create an astreamer that can read WAL from tar file.
657 */
658static astreamer *
660{
661 astreamer_waldump *streamer;
662
664 *((const astreamer_ops **) &streamer->base.bbs_ops) =
666
667 streamer->privateInfo = privateInfo;
668
669 return &streamer->base;
670}
671
672/*
673 * Main entry point of the archive streamer for reading WAL data from a tar
674 * file. If a member is identified as a valid WAL file, a hash entry is created
675 * for it, and its contents are copied into that entry's buffer, making them
676 * accessible to the decoding routine.
677 */
678static void
680 const char *data, int len,
682{
684 XLogDumpPrivate *privateInfo = mystreamer->privateInfo;
685
686 Assert(context != ASTREAMER_UNKNOWN);
687
688 switch (context)
689 {
691 {
692 char *fname = NULL;
693 ArchivedWALFile *entry;
694 bool found;
695
696 /* Shouldn't see MEMBER_HEADER in the middle of a file */
697 Assert(privateInfo->cur_file == NULL);
698
699 pg_log_debug("reading \"%s\"", member->pathname);
700
701 if (!member_is_wal_file(mystreamer, member, &fname))
702 break;
703
704 /*
705 * Skip range filtering during initial startup, before the WAL
706 * segment size and segment number bounds are known.
707 */
708 if (!READ_ANY_WAL(privateInfo))
709 {
710 XLogSegNo segno;
711 TimeLineID timeline;
712
713 /*
714 * Skip the segment if the timeline does not match, if it
715 * falls outside the caller-specified range.
716 */
717 XLogFromFileName(fname, &timeline, &segno, privateInfo->segsize);
718 if (privateInfo->timeline != timeline ||
719 privateInfo->start_segno > segno ||
720 privateInfo->end_segno < segno)
721 {
722 pfree(fname);
723 break;
724 }
725 }
726
727 /*
728 * Note: ArchivedWAL_insert may cause existing hash table
729 * entries to move. While cur_file is known to be NULL right
730 * now, read_archive_wal_page may have a live hash entry
731 * pointer, which it needs to take care to update after
732 * read_archive_file completes.
733 */
734 entry = ArchivedWAL_insert(privateInfo->archive_wal_htab,
735 fname, &found);
736
737 /*
738 * Shouldn't happen, but if it does, simply ignore the
739 * duplicate WAL file.
740 */
741 if (found)
742 {
743 pg_log_warning("ignoring duplicate WAL \"%s\" found in archive \"%s\"",
744 member->pathname, privateInfo->archive_name);
745 pfree(fname);
746 break;
747 }
748
749 entry->buf = makeStringInfo();
750 entry->spilled = false;
751 entry->read_len = 0;
752 privateInfo->cur_file = entry;
753 }
754 break;
755
757 if (privateInfo->cur_file)
758 {
759 appendBinaryStringInfo(privateInfo->cur_file->buf, data, len);
760 privateInfo->cur_file->read_len += len;
761 }
762 break;
763
765
766 /*
767 * End of this tar member; mark cur_file NULL so subsequent
768 * content callbacks (if any) know no WAL file is currently
769 * active.
770 */
771 privateInfo->cur_file = NULL;
772 break;
773
775 break;
776
777 default:
778 /* Shouldn't happen. */
779 pg_fatal("unexpected state while parsing tar file");
780 }
781}
782
783/*
784 * End-of-stream processing for an astreamer_waldump stream. This is a
785 * terminal streamer so it must have no successor.
786 */
787static void
789{
790 Assert(streamer->bbs_next == NULL);
791}
792
793/*
794 * Free memory associated with an astreamer_waldump stream.
795 */
796static void
798{
799 Assert(streamer->bbs_next == NULL);
800 pfree(streamer);
801}
802
803/*
804 * Returns true if the archive member name matches the WAL naming format. If
805 * successful, it also outputs the WAL segment name.
806 */
807static bool
809 char **fname)
810{
811 int pathlen;
812 char pathname[MAXPGPATH];
813 char *filename;
814
815 /* We are only interested in normal files */
816 if (!member->is_regular)
817 return false;
818
819 if (strlen(member->pathname) < XLOG_FNAME_LEN)
820 return false;
821
822 /*
823 * For a correct comparison, we must remove any '.' or '..' components
824 * from the member pathname. Similar to member_verify_header(), we prepend
825 * './' to the path so that canonicalize_path() can properly resolve and
826 * strip these references from the tar member name.
827 */
828 snprintf(pathname, MAXPGPATH, "./%s", member->pathname);
829 canonicalize_path(pathname);
830 pathlen = strlen(pathname);
831
832 /* Skip files in subdirectories other than pg_wal/ */
833 if (pathlen > XLOG_FNAME_LEN &&
834 strncmp(pathname, XLOGDIR, strlen(XLOGDIR)) != 0)
835 return false;
836
837 /* WAL file may appear with a full path (e.g., pg_wal/<name>) */
838 filename = pathname + (pathlen - XLOG_FNAME_LEN);
840 return false;
841
843
844 return true;
845}
846
847/*
848 * Helper function for WAL file hash table.
849 */
850static uint32
852{
853 const unsigned char *ss = (const unsigned char *) s;
854
855 return hash_bytes(ss, strlen(s));
856}
int read_archive_wal_page(XLogDumpPrivate *privateInfo, XLogRecPtr targetPagePtr, size_t count, char *readBuff)
static void astreamer_waldump_content(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
static astreamer * astreamer_waldump_new(XLogDumpPrivate *privateInfo)
char * TmpWalSegDir
#define READ_ANY_WAL(privateInfo)
static uint32 hash_string_pointer(const char *s)
static const astreamer_ops astreamer_waldump_ops
static void astreamer_waldump_free(astreamer *streamer)
static void perform_tmp_write(const char *fname, StringInfo buf, FILE *file)
#define READ_CHUNK_SIZE
void free_archive_reader(XLogDumpPrivate *privateInfo)
static void astreamer_waldump_finalize(astreamer *streamer)
static bool member_is_wal_file(astreamer_waldump *mystreamer, astreamer_member *member, char **fname)
static ArchivedWALFile * get_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo)
static FILE * prepare_tmp_write(const char *fname, XLogDumpPrivate *privateInfo)
static bool read_archive_file(XLogDumpPrivate *privateInfo)
static void setup_tmpwal_dir(const char *waldir)
void free_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo)
void init_archive_reader(XLogDumpPrivate *privateInfo, pg_compress_algorithm compression)
static void astreamer_free(astreamer *streamer)
Definition astreamer.h:155
static void astreamer_content(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
Definition astreamer.h:137
static void astreamer_finalize(astreamer *streamer)
Definition astreamer.h:147
astreamer_archive_context
Definition astreamer.h:63
@ ASTREAMER_MEMBER_HEADER
Definition astreamer.h:65
@ ASTREAMER_MEMBER_CONTENTS
Definition astreamer.h:66
@ ASTREAMER_MEMBER_TRAILER
Definition astreamer.h:67
@ ASTREAMER_ARCHIVE_TRAILER
Definition astreamer.h:68
@ ASTREAMER_UNKNOWN
Definition astreamer.h:64
astreamer * astreamer_gzip_decompressor_new(astreamer *next)
astreamer * astreamer_lz4_decompressor_new(astreamer *next)
astreamer * astreamer_tar_parser_new(astreamer *next)
astreamer * astreamer_zstd_decompressor_new(astreamer *next)
#define Min(x, y)
Definition c.h:1091
#define ngettext(s, p, n)
Definition c.h:1270
#define Assert(condition)
Definition c.h:943
#define unlikely(x)
Definition c.h:438
uint32_t uint32
Definition c.h:624
#define PG_BINARY_W
Definition c.h:1377
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))
pg_compress_algorithm
Definition compression.h:22
@ PG_COMPRESSION_GZIP
Definition compression.h:24
@ PG_COMPRESSION_LZ4
Definition compression.h:25
@ PG_COMPRESSION_ZSTD
Definition compression.h:26
void * pg_malloc(size_t size)
Definition fe_memutils.c:47
void pg_free(void *ptr)
#define palloc0_object(type)
Definition fe_memutils.h:75
int pg_file_create_mode
Definition file_perm.c:19
uint32 hash_bytes(const unsigned char *k, int keylen)
Definition hashfn.c:146
#define close(a)
Definition win32.h:12
#define read(a, b, c)
Definition win32.h:13
#define pg_log_error(...)
Definition logging.h:106
#define pg_log_error_detail(...)
Definition logging.h:109
#define pg_log_debug(...)
Definition logging.h:133
void pfree(void *pointer)
Definition mcxt.c:1616
char * pnstrdup(const char *in, Size len)
Definition mcxt.c:1792
#define pg_fatal(...)
#define MAXPGPATH
const void size_t len
const void * data
static char * filename
Definition pg_dumpall.c:133
static char buf[DEFAULT_XLOG_SEG_SIZE]
int open_file_in_directory(const char *directory, const char *fname)
Definition pg_waldump.c:182
#define pg_log_warning(...)
Definition pgfnames.c:24
void canonicalize_path(char *path)
Definition path.c:337
#define snprintf
Definition port.h:260
char * mkdtemp(char *path)
Definition mkdtemp.c:286
static int fd(const char *x, int i)
static int fb(int x)
char * psprintf(const char *fmt,...)
Definition psprintf.c:43
void destroyStringInfo(StringInfo str)
Definition stringinfo.c:409
StringInfo makeStringInfo(void)
Definition stringinfo.c:72
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
Definition stringinfo.c:281
const char * fname
XLogSegNo start_segno
Definition pg_waldump.h:67
struct ArchivedWAL_hash * archive_wal_htab
Definition pg_waldump.h:59
XLogRecPtr endptr
Definition pg_waldump.h:30
astreamer * archive_streamer
Definition pg_waldump.h:40
char * archive_read_buf
Definition pg_waldump.h:41
XLogRecPtr startptr
Definition pg_waldump.h:29
char * archive_name
Definition pg_waldump.h:36
TimeLineID timeline
Definition pg_waldump.h:27
char * archive_dir
Definition pg_waldump.h:35
XLogSegNo end_segno
Definition pg_waldump.h:68
struct ArchivedWALFile * cur_file
Definition pg_waldump.h:52
size_t archive_read_buf_size
Definition pg_waldump.h:42
char pathname[MAXPGPATH]
Definition astreamer.h:81
void(* content)(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
Definition astreamer.h:128
XLogDumpPrivate * privateInfo
const astreamer_ops * bbs_ops
Definition astreamer.h:111
astreamer * bbs_next
Definition astreamer.h:112
#define IsValidWalSegSize(size)
XLogLongPageHeaderData * XLogLongPageHeader
static bool IsXLogFileName(const char *fname)
static void XLogFromFileName(const char *fname, TimeLineID *tli, XLogSegNo *logSegNo, int wal_segsz_bytes)
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define MAXFNAMELEN
#define XLOGDIR
#define XLOG_FNAME_LEN
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
#define XLogRecPtrIsInvalid(r)
Definition xlogdefs.h:30
uint64 XLogRecPtr
Definition xlogdefs.h:21
uint32 TimeLineID
Definition xlogdefs.h:63
uint64 XLogSegNo
Definition xlogdefs.h:52